在并发编程中,除了CountDownLatch和Semaphore,CyclicBarrier和Phaser也是实现多线程协作的重要工具。它们在处理多阶段任务同步、动态调整参与线程等场景中展现出独特价值。本文作为并发工具类系列的第二篇,将深入解析CyclicBarrier和Phaser的核心机制、实战案例及适用场景,帮助开发者构建更灵活的线程协作模型。
一、CyclicBarrier:循环屏障的多阶段协作
CyclicBarrier(循环屏障)的设计初衷是让一组线程在到达某个屏障点时暂停,直至所有线程都到达后再共同继续执行。与CountDownLatch的一次性使用不同,CyclicBarrier的计数器可通过reset()方法重置,支持多轮次的线程同步,这也是其 “循环” 特性的由来。
1.1 核心原理与方法解析
CyclicBarrier基于 “屏障点 + 集体唤醒” 机制实现,核心方法如下:
方法 | 功能描述 |
CyclicBarrier(int parties) | 构造方法,指定参与同步的线程数量(parties) |
CyclicBarrier(int parties, Runnable barrierAction) | 带屏障动作的构造方法,所有线程到达后先执行该动作 |
int await() | 线程到达屏障点后阻塞等待,返回当前线程的到达顺序(0~parties-1) |
int await(long timeout, TimeUnit unit) | 带超时的等待,超时后屏障被打破,抛出TimeoutException |
void reset() | 重置屏障至初始状态,所有等待线程将收到BrokenBarrierException |
int getNumberWaiting() | 返回当前正在屏障点等待的线程数 |
boolean isBroken() | 判断屏障是否被打破(如线程中断、超时等) |
关键特性:CyclicBarrier的核心是 “所有线程必须同时到达屏障点”,适用于多阶段任务中各阶段的同步,且支持重复使用。
1.2 典型场景:分阶段数据处理
在数据处理流程中,常需将任务分为多个阶段(如数据采集→清洗→分析→存储),每个阶段需所有线程完成当前工作后才能进入下一阶段。CyclicBarrier能完美管控这种分阶段协作。
实战案例:
public class DataProcessDemo {// 3个线程参与处理,所有线程到达后执行阶段总结动作private static final CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("\n=== 所有线程完成当前阶段,进入下一阶段 ==="));public static void main(String[] args) {// 启动3个数据处理线程for (int i = 0; i < 3; i++) {new Thread(new DataProcessor(i), "处理线程-" + i).start();}}static class DataProcessor implements Runnable {private int threadId;public DataProcessor(int threadId) {this.threadId = threadId;}@Overridepublic void run() {try {// 第一阶段:数据采集System.out.println("线程" + threadId + ":开始数据采集");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据采集完成,等待其他线程");barrier.await();// 第二阶段:数据清洗System.out.println("线程" + threadId + ":开始数据清洗");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据清洗完成,等待其他线程");barrier.await();// 第三阶段:数据存储System.out.println("线程" + threadId + ":开始数据存储");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据存储完成,等待其他线程");barrier.await();System.out.println("线程" + threadId + ":所有阶段处理完成");} catch (InterruptedException | BrokenBarrierException e) {System.out.println("线程" + threadId + ":屏障被打破,异常信息:" + e.getMessage());}}}
}
运行结果片段:
线程0:开始数据采集线程1:开始数据采集线程2:开始数据采集线程1:数据采集完成,等待其他线程线程0:数据采集完成,等待其他线程线程2:数据采集完成,等待其他线程=== 所有线程完成当前阶段,进入下一阶段 ===线程0:开始数据清洗线程1:开始数据清洗线程2:开始数据清洗...
案例解析:
- 每个线程完成阶段任务后调用barrier.await(),阻塞等待其他线程;
- 当 3 个线程均到达屏障点时,先执行屏障动作(打印阶段总结),再唤醒所有线程进入下一阶段;
- 若任何线程在等待过程中被中断或超时,屏障会被标记为 “打破”,所有线程将收到BrokenBarrierException,避免部分线程无限等待。
1.3 与 CountDownLatch 的核心差异
尽管两者都能实现线程同步,但适用场景截然不同:
维度 | CyclicBarrier | CountDownLatch |
复用性 | 可通过reset()重置,支持多轮同步 | 计数器归 0 后不可复用,一次性使用 |
同步逻辑 | 所有线程相互等待(线程→线程) | 一组线程等待另一组线程(线程组→线程组) |
核心动作 | 线程到达屏障点后阻塞,需集体唤醒 | 线程完成任务后递减计数器,无需等待 |
典型场景 | 分阶段任务的各阶段同步 | 初始化等待、事件通知 |
示例对比:用CountDownLatch实现上述分阶段任务需为每个阶段创建新的计数器,而CyclicBarrier可通过同一实例完成所有阶段同步,代码更简洁。
二、Phaser:动态调整的阶段同步器
Phaser是 Java 7 引入的高级同步工具,兼具CountDownLatch和CyclicBarrier的功能,且支持动态调整参与线程数量(注册 / 注销),适用于线程数量动态变化的多阶段任务。
2.1 核心原理与方法解析
Phaser通过 “阶段(phase)” 和 “参与者(party)” 概念实现同步,核心方法如下:
方法 | 功能描述 |
Phaser(int parties) | 构造方法,指定初始参与者数量 |
int register() | 注册一个参与者,返回当前阶段号 |
boolean deregister() | 注销一个参与者,返回是否为最后一个参与者 |
int arriveAndAwaitAdvance() | 当前参与者到达阶段终点,等待其他参与者后进入下一阶段 |
int arriveAndDeregister() | 到达阶段终点并注销,适用于完成所有任务的参与者 |
int getPhase() | 返回当前阶段号(从 0 开始,溢出后重置为 0) |
int getRegisteredParties() | 返回当前注册的参与者数量 |
关键特性:Phaser的阶段号随所有参与者到达而递增,支持动态增减参与者,且可通过重写onAdvance(int phase, int registeredParties)方法自定义阶段切换逻辑。
2.2 典型场景:动态线程的多阶段任务
在分布式计算或并行处理中,线程可能因任务完成而退出,或因新任务加入而新增,Phaser能灵活应对这种动态变化。
实战案例:
public class DynamicTaskDemo {public static void main(String[] args) throws InterruptedException {// 初始3个参与者,重写阶段切换逻辑Phaser phaser = new Phaser(3) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("\n=== 阶段" + phase + "完成,当前参与者:" + registeredParties + " ===");// 当参与者为0或完成3个阶段时终止return registeredParties == 0 || phase >= 2;}};// 启动3个初始任务线程for (int i = 0; i < 3; i++) {new Thread(new DynamicWorker(phaser, i), "初始线程-" + i).start();}// 主线程等待所有阶段完成while (!phaser.isTerminated()) {Thread.sleep(100);}System.out.println("\n所有阶段完成,Phaser终止");}static class DynamicWorker implements Runnable {private Phaser phaser;private int workerId;public DynamicWorker(Phaser phaser, int workerId) {this.phaser = phaser;this.workerId = workerId;}@Overridepublic void run() {try {// 阶段0:数据准备System.out.println("线程" + workerId + ":阶段0准备中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待进入阶段1// 线程0在阶段1后注册新参与者if (workerId == 0 && phaser.getPhase() == 1) {phaser.register();new Thread(new DynamicWorker(phaser, 3), "新增线程-3").start();System.out.println("线程0:注册新参与者,当前参与者数:" + phaser.getRegisteredParties());}// 阶段1:数据处理System.out.println("线程" + workerId + ":阶段1处理中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待进入阶段2// 线程1在阶段2后注销if (workerId == 1) {phaser.deregister();System.out.println("线程1:已注销,当前参与者数:" + phaser.getRegisteredParties());return; // 线程1完成任务退出}// 阶段2:结果汇总System.out.println("线程" + workerId + ":阶段2汇总中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndDeregister(); // 完成后注销} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
运行结果片段:
线程0:阶段0准备中...线程1:阶段0准备中...线程2:阶段0准备中...=== 阶段0完成,当前参与者:3 ===线程0:阶段1处理中...线程1:阶段1处理中...线程2:阶段1处理中...线程0:注册新参与者,当前参与者数:4新增线程-3:阶段1处理中...=== 阶段1完成,当前参与者:4 ===线程0:阶段2汇总中...线程2:阶段2汇总中...线程1:已注销,当前参与者数:3新增线程-3:阶段2汇总中...=== 阶段2完成,当前参与者:3 ===所有阶段完成,Phaser终止
案例解析:
- Phaser初始注册 3 个参与者,阶段 0 完成后进入阶段 1;
- 线程 0 在阶段 1 动态注册新参与者(线程 3),参与者数变为 4;
- 线程 1 在阶段 1 完成后注销,参与者数减为 3;
- 所有参与者完成阶段 2 后,onAdvance()返回true,Phaser终止;
- 动态注册 / 注销功能使Phaser能适应线程数量变化,比CyclicBarrier更灵活。
2.3 高级特性:分层 Phaser
对于复杂任务,可通过Phaser的分层机制(父 Phaser 管理子 Phaser)减少单个 Phaser 的竞争压力。例如,将 1000 个线程分为 10 组,每组由一个子 Phaser 管理,子 Phaser 再注册到父 Phaser,实现 “局部同步→全局同步” 的层级协作。
代码示例:
public class HierarchicalPhaserDemo {public static void main(String[] args) {// 父Phaser,初始0个参与者Phaser root = new Phaser(0) {@Overrideprotected boolean onAdvance(int phase, int parties) {System.out.println("全局阶段" + phase + "完成,参与组:" + parties);return phase >= 1; // 完成2个全局阶段后终止}};// 创建3个子Phaser,父Phaser为rootPhaser[] children = new Phaser[3];for (int i = 0; i < 3; i++) {children[i] = new Phaser(root, 2); // 每个子Phaser管理2个线程}// 启动6个线程(3组×2)for (int i = 0; i < 3; i++) {int groupId = i;for (int j = 0; j < 2; j++) {new Thread(() -> {for (int phase = 0; phase < 2; phase++) {System.out.println("组" + groupId + "线程" + Thread.currentThread().getId() + ":完成局部阶段" + phase);children[groupId].arriveAndAwaitAdvance(); // 等待组内同步}children[groupId].arriveAndDeregister(); // 完成后注销}).start();}}}
}
核心价值:分层机制降低了单个 Phaser 的竞争频率,提升高并发场景下的性能。
三、四大工具类的综合对比与选型
工具类 | 核心能力 | 灵活性 | 典型场景 | 适用线程数 |
CountDownLatch | 等待多线程完成 | 低(一次性) | 初始化、事件通知 | 固定 |
Semaphore | 控制资源并发数 | 中(动态许可) | 资源池、限流 | 不固定 |
CyclicBarrier | 多阶段线程同步 | 中(可重置) | 分阶段任务 | 固定 |
Phaser | 动态阶段同步 | 高(动态注册) | 动态线程任务、分层同步 | 动态变化 |
选型建议:
- 简单等待场景用CountDownLatch;
- 资源限流场景用Semaphore;
- 固定线程的多阶段任务用CyclicBarrier;
- 动态线程或复杂分层任务用Phaser。
总结
CyclicBarrier和Phaser为多线程协作提供了更灵活的解决方案:CyclicBarrier通过循环屏障实现固定线程的多阶段同步,适合分步骤协同工作;Phaser则支持动态调整参与者,能应对线程数量变化的复杂场景。
结合上一篇的CountDownLatch和Semaphore,这四类工具类基本覆盖了常见的线程协作需求。在实际开发中,需根据线程数量是否固定、是否多阶段任务、是否需要动态调整等因素选择合适的工具,以实现高效、可靠的并发控制。
掌握这些工具类的核心原理和适用场景,不仅能简化并发代码的编写,更能提升系统在高并发场景下的稳定性和性能。