跳至主要內容

响应式流

Vingkin...大约 4 分钟

概念

Reactive Streams为我们提供了Java中的Reactive Programming的通用API。传统异步编程的写法,不同任务分别在不同的线程中执行,协调这些线程执行的先后顺序、线程间的依赖顺序是一件非常麻烦的事情,而Reactive Streams就是为了解决该问题。Reactive Streams API中仅仅包含了如下四个接口:

//发布者
public  interface  Publisher < T > {
	public  void  subscribe(Subscriber <super  T >  s);
}

//订阅者
public  interface  Subscriber < T > {
	public  void  onSubscribe(Subscription  s);
	public  void  onNext(T  t);
	public  void  onError(Throwable  t);
	public  void  onComplete();
}

//表示Subscriber消费Publisher发布的一个消息的生命周期
public interface Subscription {
	public void request(long n);
	public void cancel();
}

//处理器,表示一个处理阶段,它既是订阅者也是发布者,并且遵守两者的契约
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
	
}

示例1

通过Publisher,Subscriber和Subscription完成一个发布-订阅模式

public class ReactiveStreamDemo {
    public static void main(String[] args) throws InterruptedException {

        // 1、定义发布者,发布的数据类型是Integer
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        // 2、定义订阅者
        Subscriber<Integer> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系,需要用它来给发布者响应
                this.subscription = subscription;
                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接收到一个数据,处理
                System.out.println("接收到数据:" + item);
                // 处理完调用request再请求一个数据
                this.subscription.request(1);
                // 或者已经达到目标,调用cancel告诉发布者不在调用数据
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常
                throwable.printStackTrace();
                // 告诉发布者后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完成(发布者关闭了)
                System.out.println("处理完了");
            }
        };

        // 3、发布者订阅者之间建立订阅关系
        publisher.subscribe(subscriber);

        // 4、生产数据并发布
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
                .forEach(i -> {
                    publisher.submit(i);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });

        // 5、结束后 关闭发布者
        // 正式环境应放在finally或者try-resource中确保关闭
        publisher.close();

        Thread.currentThread().join(1000);

    }
}

示例2

在示例1的基础上,添加了processor进行中间处理

public class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {
    private static final String[] NUMBER_TO_CHINESE = {"零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"};

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系,需要用它来发布响应
        this.subscription = subscription;
        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接收到一个数据,进行处理
        System.out.println("处理器接收到的数据:" + item);
        // 过滤掉小于0的,发布出去
        if (item >= 0) {
            this.submit(NUMBER_TO_CHINESE[item]);
        }
        // 处理完调用request再请求一个数据
        this.subscription.request(1);
        // 或者已经达到目标,调用cancel告诉发布者不在调用数据
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出现了异常
        throwable.printStackTrace();
        // 告诉发布者后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完成(发布者关闭了)
        System.out.println("处理器处理完了");
    }
}
public class ReactiveStreamDemo2 {
    public static void main(String[] args) throws InterruptedException {
        // 1、定义发布者,发布的数据类型是Integer
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        // 2、定义处理器,对数据进行过滤,并转换成String类型
        MyProcessor processor = new MyProcessor();

        // 3、发布者和处理器之间建立订阅关系
        publisher.subscribe(processor);

        // 4、定义最终订阅者,消费String类型数据
        Subscriber<String> subscriber = new Subscriber<>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("接收到处理后的数据:" + item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常
                throwable.printStackTrace();
                // 告诉发布者后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完成(发布者关闭了)
                System.out.println("订阅者处理完了");
            }
        };

        // 5、处理器和最终订阅者建立订阅关系
        processor.subscribe(subscriber);

        // 6、生产数据并发布
        // submit是一个阻塞式方法
        Stream.of(-3, -2, -1, 0, 1, 2, 3, 4, 5)
                .forEach(i -> {
                    publisher.submit(i);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });

        // 7、结束后 关闭发布者
        // 正式环境应放在finally或者try-resource中确保关闭
        publisher.close();

        Thread.currentThread().join(1000);
    }
}

背压

背压可以理解为订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。

在JDK9zhong发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.8