1. Scheduler
Project Reactor的给operator操作提供的线程调度器,内部可简单理解为一个线程池(当然实际作用更多)。创建它一般使用Schedulers
工厂类,如:
Schedulers.elastic()
:创建以缓存线程池为基础的调度器Schedulers.parallel()
:创建固定线程数的调度器Schedulers.fromExector()
:从已有线程池中创建调度器- …
2. publishOn
& subscribeOn
及其区别
2.1. subscribeOn
文档:对publisher
执行subscribe
、onSubscribe
以及request
时,再指定的Scheduler
运行,因此,当使用subscribeOn
时,不论它在什么为止,它会从开头/源头影响执行onNext/onError/onComplete
的线程上下文,直到下一个publishOn
出现。
使用范围:当publisher
生产数据很慢的时候,如Blocking IO,下面的例子使用JDBC技术,就是适合的例子:
1
2
3
4
public Flux<POJO> fetchData() {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())) // JPA(Using JDBC), Blocking IO, Slow publisher
.subscribeOn(repositoryScheduler);
}
2.2. publishOn
文档:指定对publisher
的onNext/onError/onComplete
操作的Scheduler
。从它会影响其所在位置的后面,直到下一个publishOn
出现的线程上下文。
使用范围:当publisher
生产数据很快的时候,但消费者处理慢的适合,下面的例子将数据存入数据库,就是适合的例子(与subscribeOn
相反):
1
2
3
4
5
public void storeData(Flux<POJO> data) {
data.publishOn(repositoryScheduler)
.doOnNext(repository::save) // JPA(Using JDBC), Blocking IO, Slow consumer
.then();
}
当无法将
publisher
推入到下一个scheduler
时的情景(即scheduler
底层线程池出现拒绝情况,以抛异常为例),那么线程上下文就是之前的线程决定的,并进入doOnError
等处。此外,每个操作符内部的线程不会变化,包括生成
publisher
的操作符。
2.3. 区别和用途
区别:上面已经说的很清楚,特性以及应用场景都有说明。
用途:在反应式编程中进行线程池隔离,一定程度上避免了会导致线程阻塞的程序执行影响到程序执行效率。
3. 例子说明
1
2
3
4
5
6
7
8
9
10
11
12
Flux.create(emitter -> {
// In generator-scheduler
IntStream.range(0, 100).forEach(emitter::next);
emitter.complete();
}).subscribeOn(generatorScheduler)
.map(i -> i + 1) // In generator-scheduler
.publishOn(consumerScheduler)
.map(i -> -i) // In consumer-scheduler
.subscribe(System.out::println, // In consumer-scheduler
System.err::println, // Detemined by the last valid scheduler/thread
() -> System.out.println("Complete") // In consumer-scheduler
);