文章目录
- 七、调度与线程模型
- 7.1 概述
- 7.2 Scheduler: Reactor 的线程调度器
- 7.3 两大核心操作符:subscribeOn vs publishOn
- 7.4 示例详解
- 7.4.1 subscribeOn()的全局影响
- 7.4.2 publishOn() 的局部切换
- 7.4.3 多个publishOn切换
- 7.4.4 线程切换时序图
- 7.5 核心调度器
- 7.5.1 BoundedElastic:IO 密集型任务首选
- 7.5.2 Parallel:CPU 密集型任务首选
- 7.5.3 Single:串行任务专用
- 7.5.4 Schedulers.immediate()
- 7.5.5 Schedulers.elastic()
- 7.5.6 Schedulers.fromExecutorService(ExecutorService)
- 7.5.7 Schedulers.new() 工厂方法
- 7.5.8 调度器使用最佳实践
- 7.6 线程模型实战: 典型场景
- 7.6.1 I/O密集型任务
- 7.6.2 场景 2:CPU 密集型任务
- 7.6.3 混合任务(I/O + CPU)
- 7.7 综合示例
- 7.8 高级特性
- 7.8.1 调用器生命周期管理
- 7.8.2 自定义线程命名
- 7.8.3 在操作符中使用调度器
- 7.9 最佳实践与陷阱
七、调度与线程模型
✅ 核心作用
- 线程抽象:将底层线程管理与响应式流解耦,提供统一的 API 控制执行上下文。
- 异步执行:支持非阻塞操作,避免阻塞主线程,提升系统吞吐量。
- 并发控制:通过不同类型的调度器,适配不同的并发场景(如 IO 密集型、CPU 密集型)。
🌺 关键概念
- 调度器(Scheduler):负责提供执行任务的线程,是 Reactor 中线程池的抽象。
- 调度器工作线程(Worker):
Scheduler
创建的轻量级工作单元,负责执行具体任务。 - publishOn () 与 subscribeOn ():用于切换执行上下文的操作符。
subscribeOn()
:指定订阅操作(包括上游数据生成)的执行线程。publishOn()
:指定下游操作符链的执行线程
7.1 概述
Reactor 与 RxJava 类似,可以被认为是并发无关的 。也就是说,它不强制执行并发模型。相反,它把控制权交给开发者自己。然而,这并不妨碍该库帮助你处理并发问题。
获得 Flux
或 Mono
并不一定意味着它在专用的 Thread
,大多数操作符会在前一个操作符执行的 Thread
中继续工作。除非另有说明,最顶层的操作符(源操作符)本身会在调用 subscribe()
Thread
中运行。以下示例在新线程中运行 Mono
:
public static void main(String[] args) throws InterruptedException {final Mono<String> mono = Mono.just("hello "); // 🥇 Mono<String> 在线程 main 中组装。Thread t = new Thread(() -> mono.map(msg -> msg + "thread ").subscribe(v -> // 🥈 它是在线程 Thread-0 中订阅的。System.out.println(v + Thread.currentThread().getName()) // map 和 onNext 回调实际上都在 Thread-0 中运行));t.start();t.join();}
7.2 Scheduler: Reactor 的线程调度器
Scheduler
是 Reactor 的线程抽象,类似于 Java 的 ExecutorService
,但专为响应式流设计。
✅ 核心作用:控制
Publisher
在哪个线程上执行。
Reactor 提供了多种内置 Scheduler
:
Scheduler | 用途 | 线程模型 |
---|---|---|
Schedulers.immediate() | 当前线程执行 | ❌ 不推荐用于生产 |
Schedulers.single() | 共享的单线程 | 1 个线程,复用 |
Schedulers.parallel() | CPU 密集型任务 | 固定线程数(CPU 核数) |
Schedulers.boundedElastic() | I/O 阻塞任务 | 弹性线程池(默认 10万线程上限) |
Schedulers.newXXX() | 自定义线程池 | 如 newParallel() |
7.3 两大核心操作符:subscribeOn vs publishOn
这是理解 Reactor 线程模型的重中之重!
🔑 核心区别
操作符 | 作用 | 影响范围 |
---|---|---|
subscribeOn() | 指定 Publisher 的创建和上游执行线程 | 影响整个链的上游(从源头到当前位置) |
publishOn() | 指定下游操作的执行线程 | 只影响其后的下游操作(当前位置到 subscribe ) |
🎯 记忆口诀:
- subscribeOn:从哪里开始(影响源头)
- publishOn:从哪里切换(影响后续)
7.4 示例详解
7.4.1 subscribeOn()的全局影响
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;import java.util.concurrent.TimeUnit;/*** @author: laoren* @description: subscribeOn的全局影响* @version: 1.0.0*/
public class SubscribeOnExample {public static void main(String[] args) {Flux.just("A", "B", "C").map(data -> {System.out.println("1️⃣ Map1 线程: " + Thread.currentThread().getName());return data + "-1";}).subscribeOn(Schedulers.parallel()).map(data -> {System.out.println("2️⃣ Map2 线程: " + Thread.currentThread().getName());return data + "-2";}).subscribe(data -> {System.out.println("📩 订阅线程: " + Thread.currentThread().getName() + ", 数据: " + data);});try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}}
}
1️⃣ Map1 线程: parallel-1
2️⃣ Map2 线程: parallel-1
📩 订阅线程: parallel-1, 数据: A-1-2
1️⃣ Map1 线程: parallel-1
2️⃣ Map2 线程: parallel-1
📩 订阅线程: parallel-1, 数据: B-1-2
1️⃣ Map1 线程: parallel-1
2️⃣ Map2 线程: parallel-1
📩 订阅线程: parallel-1, 数据: C-1-2
✅ 结论:subscribeOn(Schedulers.parallel())
即使放在中间,也使 just()
和两个 map()
都在 parallel
线程执行。
subscribeOn
影响范围:
7.4.2 publishOn() 的局部切换
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;/*** @author: laoren* @description: publishOn()局部切换* @version: 1.0.0*/
public class PublishOnExample {public static void main(String[] args) throws InterruptedException {Flux.just("A", "B").map(data -> {System.out.println("📍 上游 Map 线程: " + Thread.currentThread().getName());return data + "-up";})// ✅ publishOn 切换下游线程.publishOn(Schedulers.boundedElastic()).map(data -> {System.out.println("📍 下游 Map 线程: " + Thread.currentThread().getName());return data + "-down";}).subscribe(data ->System.out.println("📩 订阅线程: " + Thread.currentThread().getName() + ", 数据: " + data));Thread.sleep(1000);}
}
📍 上游 Map 线程: main
📍 上游 Map 线程: main
📍 下游 Map 线程: boundedElastic-1
📩 订阅线程: boundedElastic-1, 数据: A-up-down
📍 下游 Map 线程: boundedElastic-1
📩 订阅线程: boundedElastic-1, 数据: B-up-down
✅ 结论:publishOn
之后的所有操作(包括 subscribe
)都在 boundedElastic
线程执行。
publishOn() 影响范围:
🔴 红色部分(下游)在 elastic
线程执行,just
和 map1
在主线程。
7.4.3 多个publishOn切换
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;public class MultiPublishOnExample {public static void main(String[] args) throws InterruptedException {Flux.just("Hello").publishOn(Schedulers.parallel()) // 切到 parallel.map(s -> {System.out.println("ParallelGroup: " + Thread.currentThread().getName());return s + "-1";}).publishOn(Schedulers.boundedElastic()) // 再 切到 boundedElastic.map(s -> {System.out.println("ElasticGroup: " + Thread.currentThread().getName());return s + "-2";}).subscribe(data ->System.out.println("Final: " + Thread.currentThread().getName() + " => " + data));Thread.sleep(1000);}
}
✅ 每个 publishOn
都会切换其后操作的执行线程。
7.4.4 线程切换时序图
7.5 核心调度器
7.5.1 BoundedElastic:IO 密集型任务首选
- 设计背景:替代已过时的
ElasticScheduler
(无界线程池,可能导致 OOM),通过有界缓冲队列和动态线程数(空闲线程会回收)避免资源耗尽。 - 适用场景:数据库查询、HTTP 请求、文件 IO 等阻塞且耗时的操作(允许线程阻塞,通过动态扩缩容应对并发)
7.5.2 Parallel:CPU 密集型任务首选
- 线程特性:线程数固定为 CPU 核心数(
Runtime.getRuntime().availableProcessors()
),无空闲线程回收(保持计算能力)。 - 适用场景:数据计算、序列化 / 反序列化、复杂集合处理等非阻塞但耗 CPU的操作(充分利用多核性能)。
7.5.3 Single:串行任务专用
- 线程特性:全局唯一单线程(所有
Schedulers.single()
调用共享),任务按提交顺序执行。 - 注意:若需多个独立串行线程,使用
Schedulers.newSingle()
创建私有单线程调度器。
7.5.4 Schedulers.immediate()
- 特性:在当前线程直接执行,不开启新线程。
- 适用场景:测试或不需要异步执行的场景。
7.5.5 Schedulers.elastic()
- 特性:弹性线程池,按需创建线程,空闲线程会在 60s 后回收。
- 适用场景:IO 密集型任务(如网络调用、文件操作)。
- 注意:已被弃用,推荐使用
boundedElastic
。
7.5.6 Schedulers.fromExecutorService(ExecutorService)
- 特性:适配自定义的
ExecutorService
,灵活集成现有线程池。
7.5.7 Schedulers.new() 工厂方法
- 特性:创建独立的新调度器实例(如
newSingle()
、newParallel()
),避免共享资源。
7.5.8 调度器使用最佳实践
按任务类型选择调度器
- IO 密集型(数据库、网络、文件)→
boundedElastic
(允许阻塞,动态扩缩容); - CPU 密集型(计算、排序、序列化)→
parallel
(固定线程数,避免线程切换开销); - 串行任务(状态依赖操作)→
single
或newSingle()
(保证顺序执行); - 同步操作(无阻塞)→
immediate
(无需线程切换,减少开销)。
避免线程阻塞滥用
- 禁止在
parallel
线程中执行阻塞操作(会浪费 CPU 核心,降低计算效率); - 阻塞操作必须放在
boundedElastic
线程(其线程设计允许阻塞);
// 错误:在parallel线程执行阻塞操作
Flux.range(1, 10).publishOn(Schedulers.parallel()).doOnNext(num -> {Thread.sleep(1000); // 阻塞CPU线程,浪费计算资源});// 正确:阻塞操作放在boundedElastic
Flux.range(1, 10).publishOn(Schedulers.boundedElastic()).doOnNext(num -> Thread.sleep(1000)); // 安全
控制boundedElastic
的资源上限
默认配置可能不适合高并发场景,可通过系统属性调整:
// JVM启动参数:调整boundedElastic的线程和队列上限
-Dreactor.schedulers.boundedElastic.maxThreads=100
-Dreactor.schedulers.boundedElastic.queuesize=1024
减少不必要的线程切换
// 优化前:多次不必要的线程切换
flux.publishOn(A).map(...).publishOn(B).filter(...).publishOn(C)// 优化后:合并操作,减少切换
flux.map(...).filter(...).publishOn(C); // 一次切换即可
7.6 线程模型实战: 典型场景
7.6.1 I/O密集型任务
如数据库、HTTP 调用
// 假设这是调用外部 HTTP 服务
Mono<String> callExternalApi() {return Mono.fromCallable(() -> {// 模拟阻塞调用Thread.sleep(1000);return "API Result";}).subscribeOn(Schedulers.boundedElastic()); // ✅ 使用弹性线程池
}// 使用
callExternalApi().map(result -> processResult(result)) // 可在主线程或其他线程处理.subscribe(System.out::println);
✅ 原则:I/O 操作必须用 boundedElastic()
,防止阻塞 CPU线程。
7.6.2 场景 2:CPU 密集型任务
Flux.range(1, 1000).publishOn(Schedulers.parallel()) // ✅ 切到并行线程池.map(i -> heavyComputation(i)) // 耗时计算.subscribe(System.out::println);
✅ 原则:CPU 密集型用 parallel()
,避免创建过多线程。
7.6.3 混合任务(I/O + CPU)
externalServiceCall() // I/O: boundedElastic.publishOn(Schedulers.parallel()) // 切到 CPU 线程池.map(data -> compute(data)) // CPU 密集型计算.publishOn(Schedulers.boundedElastic()) // 再切回 I/O 线程.flatMap(result -> saveToDB(result)) // 再次 I/O 操作.subscribe();
✅ 原则:根据操作类型动态切换线程池。
7.7 综合示例
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;import java.time.Duration;
import java.util.concurrent.TimeUnit;public class SchedulerExamples {public static void main(String[] args) {SchedulerExamples examples = new SchedulerExamples();examples.schedulerTypes();System.out.println("-------------------------------------");examples.publishOnVsSubscribeOn();System.out.println("-------------------------------------");examples.parallelProcessing();System.out.println("-------------------------------------");examples.timeoutWithScheduler();}public void schedulerTypes() {// 1. 立即调度 (当前线程)Flux.just("A", "B", "C").subscribeOn(Schedulers.immediate()).subscribe(System.out::println);// 2. 单一线程调度Flux.range(1, 3).subscribeOn(Schedulers.single()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));// 3. 弹性线程池 (适合IO密集型任务)Flux.range(1, 3).subscribeOn(Schedulers.boundedElastic()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));// 4. 并行调度 (适合CPU密集型任务)Flux.range(1, 3).subscribeOn(Schedulers.parallel()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));}public void publishOnVsSubscribeOn() {// subscribeOn - 影响整个链的订阅上下文Mono.fromCallable(() -> {System.out.println("Callable on: " + Thread.currentThread().getName());return "Result";}).subscribeOn(Schedulers.boundedElastic()).subscribe(result ->System.out.println("Subscribe on: " + Thread.currentThread().getName()));// publishOn - 影响后续操作的执行上下文Flux.range(1, 3).map(i -> {System.out.println("Map1 on: " + Thread.currentThread().getName());return i * 2;}).publishOn(Schedulers.parallel()).map(i -> {System.out.println("Map2 on: " + Thread.currentThread().getName());return i + 1;}).subscribe();}public void parallelProcessing() {// 并行处理流Flux.range(1, 10).parallel(4) // 分成4个并行流.runOn(Schedulers.parallel()).map(i -> i * i).sequential() // 合并回顺序流.subscribe(System.out::println);}public void timeoutWithScheduler() {// 使用调度器实现超时Mono.delay(Duration.ofSeconds(3)).timeout(Duration.ofSeconds(1), Schedulers.parallel()).subscribe(System.out::println,error -> System.out.println("Timeout: " + error));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
7.8 高级特性
7.8.1 调用器生命周期管理
// 创建独立的调度器实例
Scheduler customScheduler = Schedulers.newBoundedElastic(10, 100, "custom");// 使用自定义调度器
Flux.just(1, 2, 3).subscribeOn(customScheduler).subscribe();// 使用完毕后关闭调度器(重要!避免资源泄漏)
customScheduler.dispose();
7.8.2 自定义线程命名
Scheduler namedScheduler = Schedulers.newParallel("my-thread", 4);
Flux.just("A", "B").subscribeOn(namedScheduler).subscribe(value -> {System.out.println("Running on: " + Thread.currentThread().getName());});// 输出:Running on: my-thread-1
7.8.3 在操作符中使用调度器
// 使用 subscribeOn 在 flatMap 中为每个内部流指定调度器
Flux.just(1, 2, 3).flatMap(num -> Mono.just(num * 2).subscribeOn(Schedulers.parallel()) // 为每个元素创建独立的执行上下文).subscribe();
7.9 最佳实践与陷阱
✅ 最佳实践
- I/O 操作 →
Schedulers.boundedElastic()
- CPU 计算 →
Schedulers.parallel()
- 避免在
map()
中阻塞 - 合理使用
publishOn
切换线程 subscribeOn
通常放在链的开头或中间,效果相同
❌ 常见陷阱
// ❌ 错误:在 parallel 线程中执行阻塞 I/O
Flux.range(1, 10).publishOn(Schedulers.parallel()).map(i -> blockingIoCall(i)) // 阻塞调用!会耗尽 parallel 线程池.subscribe();// ✅ 正确:使用 boundedElastic
Flux.range(1, 10).flatMap(i -> Mono.fromCallable(() -> blockingIoCall(i)).subscribeOn(Schedulers.boundedElastic())).subscribe();
概念 | 关键点 |
---|---|
Scheduler | 线程执行的“容器”,选择合适的类型至关重要 |
subscribeOn() | 影响上游,决定 Publisher 在哪个线程启动 |
publishOn() | 影响下游,用于在链中切换执行线程 |
线程选择 | I/O → boundedElastic ,CPU → parallel |
背压与线程 | 背压控制数据流,线程控制执行位置,二者协同工作 |
🚀 掌握调度,就掌握了 Reactor 的“方向盘”。合理使用 subscribeOn
和 publishOn
,结合正确的 Scheduler
,你就能构建出高效、稳定、可扩展的响应式系统。