更多内容,前往IT-BLOG-CN
反应式编程的思想最近得到了广泛的流行。 在Java平台上有流行的反应式库RxJava和Reactor。反应式流规范的出发点是提供一个带非阻塞负压( non-blocking backpressure ) 的异步流处理规范。反应式流规范的核心接口已经添加到了Java9中的java.util.concurrent.Flow类中。Flow中包含了Flow.Publisher、Flow.Subscriber、Flow.Subscription和 Flow.Processor等 4 个核心接口。Java 9还提供了SubmissionPublisher作为Flow.Publisher的一个实现。RxJava 2和Reactor都可以很方便的与Flow类的核心接口进行互操作。 Reactive Streams API ( java.util.concurrent.Flow)实现了异步非阻塞的流处理方式。
Reactive Streams是一项非阻塞背压的异步流处理方式,因此他们有一组Publisher和Subscriber,Publisher将数据流push到Subscriber,Subscriber则将消费这些数据流,并通过backpressure(个人理解为回压)来反馈Subscriber消费时的压力,调节Publisher生产的速度。
Java 9 Flow API实现了Reactive Streams规范。Flow API是Iterator和Observer模式的组合。Iterator在pull模型上工作,其中应用程序从源中拉出数据,而Observer在push模型上工作,并在将数据从源推到应用程序时进行处理。
Java 9 Flow API订阅者可以在订阅发布者的同时请求N个项目。 然后将项目从Publisher推送到Subsriber,直到没有其他项目可推送或出现一些错误为止。
其中所有的方法都是void,因为所有的方法都是异步执行的。
Publisher函数式接口用于将数据流发送到Subscriber,publisher有两个方法用于发送数据,一个是submit,一个是offer。两个方法下面实际都是调用的doOffer方法,所以,offer方法提供了置顶延迟时间后丢弃的策略,而submit是offer的简单实现,是一致阻塞不丢弃。
// 绑定订阅者
public void subscribe(Subscriber super T> subscriber);
该接口在实现了publisher之外还实现了AutoCloseable接口,因此具有异步提交数据流到当前的订阅者中直到连接被关闭功能。所以可以直接用try块来进行资源的管理。发布者,通过使用Exceutor框架提交reactive stream数据到订阅者
代码演示:使用SubmissionPublisher作为发布者示例,看一下响应流实现的测试程序。
// Employee 用户自己定义的类
public static void main(String[] args) throws InterruptedException {//create publisherSubmissionPublisher publisher = new SubmissionPublisher<>();//register subscriberMySubscriber subscriber = new MySubscriber();publisher.subscribe(subscriber);List employees = xxxxx;//publish itemsemployees.forEach(publisher::submit);while (employees.size() != subscriber.getCount()) {TimeUnit.SECONDS.sleep(5);}publisher.close();System.out.println("exiting app");
}
Subscriber则为消费处理Publisher发送过来的数据流
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
【1】onSubscribe: 这是在Subscriber服务器订阅Publisher以接收消息时调用的第一种方法。 通常,我们调用subscription.request开始从处理器接收具体数量的数据流。
【2】onNext: 从Publisher处收到数据时,将调用此方法,这是我们在其中实现业务逻辑以处理data stream,然后从Publisher处请求更多数据的地方。
【3】onError: 当发生不可恢复的错误时,将调用此方法,我们可以使用此方法清理任务,例如关闭数据库连接。
【4】onComplete: 这类似于finally方法,并且在Publisher没有push任何其他data stream并且关闭Publisher时被调用。 我们可以使用它发送成功处理流的通知。
发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。这用于在Publisher和Subscriber之间创建异步非阻塞连接。Subscriber调用其请求方法以向Publisher获取新的data stream。它还具有取消方法以取消订阅,即关闭Subscriber和Publisher之间的连接
代码演示:subscription变量需要保存其引用。以此在onNext方法中对Publisher进行请求数据流。count变量用于记录处理的次数,将在main thread中通知任务是否处理完成
// Employee 用户自己定义的类
public class MySubscriber implements Subscriber {private Subscription subscription;private int count = 0;@Overridepublic void onSubscribe(Subscription subscription) {System.out.println("Subscribed");this.subscription = subscription;this.subscription.request(1);System.out.println("onSubscribe requested 1 item");}@Overridepublic void onNext(Employee item) {System.out.println("Process Employee" + count);count++;try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("some error happen" + throwable);}@Overridepublic void onComplete() {System.out.println("all process done");}public int getCount() {return count;}public void setCount(int count) {this.count = count;}
}
此接口同时扩展了Publisher和Subscriber,用于在Publisher和Subscriber之间转换消息。可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了Publisher与Subscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。
当发布者以比订阅者消耗的速度快得多的速度生成消息时,就会形成回压。 Flow API没有提供任何机制来发出有关背压的信号或进行处理。 但是我们可以设计自己的策略来处理它,例如微调用户或降低消息产生率。 SubmissionPublisher中提供了一个buffer的机制,允许Subsriber最大处理的量,超过该数量将被阻塞。Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256。
【1】订阅者向发布者发送订阅请求。
【2】发布者根据订阅请求生成令牌发送给订阅者。
【3】订阅者根据令牌向发布者发送请求N个数据。
【4】发送者根据订阅者的请求数量返回M(M<=N)个数据。
【5】重复3,4。
【6】数据发送完毕后由发布者发送给订阅者结束信号。
上一篇:苹果废纸篓清空了要复原怎么办?
下一篇:全宋词有几首九张机