Reactor线程隔离

Posted by keys961 on March 5, 2019

1. Scheduler

Project Reactor的给operator操作提供的线程调度器,内部可简单理解为一个线程池(当然实际作用更多)。创建它一般使用Schedulers工厂类,如:

  • Schedulers.elastic():创建以缓存线程池为基础的调度器
  • Schedulers.parallel():创建固定线程数的调度器
  • Schedulers.fromExector():从已有线程池中创建调度器

2. publishOn & subscribeOn及其区别

2.1. subscribeOn

文档:对publisher执行subscribeonSubscribe以及request时,再指定的Scheduler运行,因此,当使用subscribeOn时,不论它在什么为止,它会从开头/源头影响执行onNext/onError/onComplete的线程上下文,直到下一个publishOn出现

使用范围:当publisher生产数据很慢的时候,如Blocking IO,下面的例子使用JDBC技术,就是适合的例子:

1
2
3
4
public Flux<POJO> fetchData() {
	return  Flux.defer(() -> Flux.fromIterable(repository.findAll())) // JPA(Using JDBC), Blocking IO, Slow publisher
    	.subscribeOn(repositoryScheduler);
}

2.2. publishOn

文档:指定对publisheronNext/onError/onComplete操作的Scheduler。从它会影响其所在位置的后面,直到下一个publishOn出现的线程上下文。

使用范围:当publisher生产数据很快的时候,但消费者处理慢的适合,下面的例子将数据存入数据库,就是适合的例子(与subscribeOn相反):

1
2
3
4
5
public void storeData(Flux<POJO> data) {
    data.publishOn(repositoryScheduler)
        .doOnNext(repository::save) // JPA(Using JDBC), Blocking IO, Slow consumer
        .then();
}

当无法将publisher推入到下一个scheduler时的情景(即scheduler底层线程池出现拒绝情况,以抛异常为例),那么线程上下文就是之前的线程决定的,并进入doOnError等处。

此外,每个操作符内部的线程不会变化,包括生成publisher的操作符。

2.3. 区别和用途

区别:上面已经说的很清楚,特性以及应用场景都有说明。

用途:在反应式编程中进行线程池隔离,一定程度上避免了会导致线程阻塞的程序执行影响到程序执行效率。

3. 例子说明

1
2
3
4
5
6
7
8
9
10
11
12
Flux.create(emitter -> {
    // In generator-scheduler
    IntStream.range(0, 100).forEach(emitter::next); 
    emitter.complete();
}).subscribeOn(generatorScheduler)
    .map(i -> i + 1) // In generator-scheduler
    .publishOn(consumerScheduler)
    .map(i -> -i) // In consumer-scheduler
    .subscribe(System.out::println, // In consumer-scheduler 
               System.err::println,  // Detemined by the last valid scheduler/thread
               () -> System.out.println("Complete") // In consumer-scheduler 
              );