首页 > 编程知识 正文

流式编程 响应式编程,响应式编程 优点

时间:2023-05-05 17:11:58 阅读:166239 作者:2085

转载: https://www.cn blogs.com/Li xinjie/p/a-reactive-streams-on-JVM-is-reactor.html

响应编程

作为响应编程方向的第一步,微软在. NET生态系统中创建了Rx库(Reactive Extensions )。 RxJava是在JVM上的实现。

响应编程是一种异步编程范式,通常出现在面向对象的语言中,作为观察者模型的扩展。

关注数据的流动、变化的传播。 这意味着可以很容易地用编程语言表示静态数据流(如数组)和动态数据流(如事件源)。

响应式流程

随着时间的推移,出现了Java专用的标准化。 这是定义用于JVM平台上响应库的接口和交互规则的规范。

在响应流(Reactive Streams )中,这些接口已经集成到Java 9中,并且位于名为java.util.concurrent.Flow的父类中。

响应表达式的流程与迭代器类似,但迭代器基于拉动,而响应表达式的流程基于推送。

迭代器的使用其实是一种指令式编程,因为它决定了开发者何时调用next ()来获取下一个元素。

在应答式流程中,与上面等价的是发信者-订阅者。 但是,如果有新的可用元素,发布者会将其推送给读者。 这个“按下”是响应式的关键。

另外,对于被按下的要素的操作也是宣言性地进行的,程序员只要表现出要做什么就可以了,不需要管理怎么做。

发布者使用onNext方法将新元素推送到订户,使用onError方法通知错误,并使用onComplete方法通知退出。

您可以看到错误处理和完成(退出)也是用很好的方法处理的。 错误和结束都可以结束序列。

这种方式很灵活。 对于包含无限数组(0/1/n (多)个元素)的滴答钟表,此模式支持这些情况。

Reactor粉墨登场

Reactor是第四代响应库,是基于响应流规范在JVM平台上构建无阻塞异步APP同步的响应编程范式的实现。

大大实现了JVM上的响应流规范65http://www.reactive-streams.org /。

它是完整的无阻塞响应编程的基础,通过管理“压力”提供了高效的需求管理。

直接集成基于Java函数的API,特别是CompletableFuture、Stream和Duration。

它支持使用reactor-netty工程的无阻塞进程间通信,适用于微服务体系结构,并支持包括Websockets在内的HTTP、TCP和UDP。

注: Reactor要求Java 8讲这么多,我们先考虑一下为什么需要这样一个异步响应库?

屏蔽是徒劳的

现代APP应用可以达到非常多的并发用户,即使现代硬件能力持续改善,现代软件的性能仍然是一个重要的关注点。

要提高程序的性能,大致有两种方法。

1、使用更多线程和更多硬件资源的并行化

2、提高效率,在当前资源使用情况下寻求更高的效率

通常,Java开发人员使用块代码编写程序。 在遇到性能瓶颈之前,这种实践性很好。

此时将引入其他线程,并执行相似的块代码。 但是,这种扩展方法在资源利用方面引起争议,引起同时问题。

更糟糕的是,它会阻止资源的浪费。 仔细看,如果程序包含延迟(特别是数据库请求或网络调用等I/O ),线程当前将处于空闲状态并等待数据,从而导致资源浪费。

所以并行化方式不是银弹。 我们需要让硬件发挥完全的力量,但关于资源浪费的影响和原因也非常复杂。

异步营救

上文提到的第二种方法是寻求更有效率的方法,作为资源浪费问题的解决办法。

通过编写异步和异步块代码,可以执行切换到其他活动的任务,然后使用相同的基本资源返回到当前处理。

但是,如何在JVM中生成异步代码呢? Java有两种异步编程模型。

1、Callbacks,异步方法没有返回值,但带有回调,并在结果可用时调用回调。

2、Futures,异步方法立即返回Future。 异步处理进程计算t值,并使用Future对象包装访问。 这个值不能马上使用。 可以轮询对象以查看是否有可用的t值。

两种技术都足够好吗? 并非所有情况都是这样,这两种方式都有局限性。

回调很难组合,很快代码就变得难以阅读,维护变得困难。 它被称为“回调地狱”。

查看回调示例,显示一个用户的前五个收藏夹。 否则,我推荐五个人给他:

这么简单的功能需要这么多代码,而且嵌套多难懂。

使用Reactor的同等例子如下所示。

代码的数量、写法不是已经很清楚了吗?

与回调相比,Futures好一点,但还在组合方面做得不好。 可以组合多个Futures对象

的但是并不容易。

Future也有其它问题,很容易因为调用了get()方法造成了另一个阻塞。

另外,它不支持延迟计算,缺乏对多个值的支持,缺乏高级错误处理。

从命令式到响应式编程

像Reactor这样的响应式库的目标就是解决在JVM上“传统”异步方式的弊端,同时也关注一些额外方面:

可组合性和可读性

数据作为流,被丰富的操作符操作

什么都不会发生,直到你订阅

后压,消费者通知生产者发射的速率太快了

高级别而不是高数值抽象

可组合性和可读性可组合性,其实就是编排多个异步任务的能力,使前一个任务的结果作为后续任务的输入,或以fork-join(分叉-合并)的方式执行若干个任务,或在更高的级别重复利用这些异步任务。

任务编排的能力和代码的可读性和可维护性紧密地耦合在一起。随着异步处理在数量和复杂度上的增加,组合和阅读代码变得更加困难。

就像我们看到的,回调模型虽然简单,但是当回调里嵌套回调,达到多层时就会变成回调地狱。

Reactor提供丰富的组合选项,使嵌套级别最小,让代码的组织结构能反映出在进行什么样的抽象处理,且通常保持在同级别上。

装配线类比你可以认为响应式应用处理数据就像通过一个装配(生产)线。Reactor既是传送带又是工作站。

原材料从一个源(原始发布者)持续不断地获取,以一个完成的产品被推送给消费者(订阅者)结束。

原材料可以经过许多不同的转换,如其它的中间步骤,或者是一个更大装配线的一部分。

如果在某个地方出现一个小故障或阻塞了,出问题的工作站可以向上游发出通知来限制原材料的流动(速率)。

操作符在Reactor里,操作符就是装配线类比中的工作站。每一个操作符都向一个发布者添加某些行为,把上一步的发布者包装到一个新的实例里。整个链就是这样被链接起来的。

所以数据一开始从第一个发布者出来,然后沿着链往下游移动,且被每一个链接转换。最后,一个订阅者结束了这个处理。

响应式流规范并没有明确规定操作符,不过Reactor就提供了丰富的操作符,它们涉及到很多方面,从简单的转换、过滤到复杂的编排、错误处理。

只要不订阅,就什么都不发生rrdj写一个发布者链时,默认,数据是不会开始进入链中的。相反,你只是创建了异步处理的一个抽象描述。

通过订阅这个行为(动作),才把发布者和订阅者连接起来,然后才会触发数据在链里流动。

这是在内部实现好的,通过来自于订阅者的request信号往上游传播,一路逆流而上直到最开始的发布者那里。

Reactor核心特性

Reactor引入可组合响应式的类型,实现了发布者接口,但也提供了丰富的操作符,就是asjdy和Mono。

asjdy,流动,表示0到N个元素。

Mono,单个,表示0或1个元素。

它们之间的不同主要在语义上,表示异步处理的粗略基数。

如一个http请求只会产生一个响应,把它表示为Mono显然更有意义,且它只提供相对于0/1这样上下文的操作符,因为此时count操作显然没有太大意义。

操作符可以改变处理的最大基数,也会切换到相关类型上。如count操作符虽然存在于asjdy上,但它的返回值却是一个Mono。

asjdy一个asjdy是一个标准的Publisher,表示一个异步序列,可以发射0到N个元素,可以通过一个完成信号或错误信号终止。

就像在响应式流规范里那样,这3种类型的信号转化为对一个下游订阅者的onNext,onComplete,onError3个方法的调用。

这3个方法也可以理解为事件/回调,且它们都是可选的。

如没有onNext但有onComplete,表示一个空的有限序列。既没有onNext也没有onComplete,表示一个空的无限序列(没有什么实际用途,可用于测试)。

无限序列也没有必要是空的,如asjdy.interval(Duration)产生一个asjdy ,它是无限的,从钟表里发射出的规则的“嘀嗒”。

Mono一个Mono是一个特殊的Publisher,最多发射一个元素,可以使用onComplete信号或onError信号来终止。

它提供的操作符只是asjdy提供的一个子集,同样,一些操作符(如把Mono和Publisher结合起来)可以把它切换到一个asjdy。

如Mono#concatWith(Publisher)返回一个asjdy,然而Mono#then(Mono)返回的是另一个Mono。

Mono可以用于表示没有返回值的异步处理(与Runnable相似),用Mono表示。

创建asjdy或Mono,并订阅它们最容易的方式就是使用它们各自的工厂方法:

asjdy seq1 = asjdy.just("foo", "bar", "foobar");

List iterable = Arrays.asList("foo", "bar", "foobar");

asjdy seq2 = asjdy.fromIterable(iterable);

asjdy numbersFromFiveToSeven = asjdy.range(5, 3);

Mono noData = Mono.empty();

Mono data = Mono.just("foo");

当谈到订阅时,可以使用Java 8的lambda表达式,订阅方法有多种不同的变体,带有不同的回调。

下面是方法签名:

//订阅并触发序列

subscribe();

//可以对每一个产生的值进行处理

subscribe(Consumer super T> consumer);

//还可以响应一个错误

subscribe(Consumer super T> consumer,

Consumer super Throwable> errorConsumer);

//还可以在成功结束后执行一些代码

subscribe(Consumer super T> consumer,

Consumer super Throwable> errorConsumer,

Runnable completeConsumer);

//还可以对Subscription执行一些操作

subscribe(Consumer super T> consumer,

Consumer super Throwable> errorConsumer,

Runnable completeConsumer,

Consumer super Subscription> subscriptionConsumer);

使用Disposable取消订阅

这些基于lambda的订阅方法都返回一个Disposable类型,通过调用它的dispose()来取消这个订阅。

对于asjdy和Mono,取消就是一个信号,表明源应该停止生产元素。然而,不保证立即生效,一些源可能生产元素非常快,以致于还没有收到取消信号就已经生产完了。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。