这是一个 Apache Ignite 中非常核心的组件 —— GridClosureProcessor
,它是 分布式闭包(Closure)执行的调度中枢,负责在集群节点上异步执行用户提交的任务(如 Runnable
、Closure
)。
我们来逐层深入理解它的设计思想、关键机制和代码逻辑。
🧱 一、类概览:GridClosureProcessor
public class GridClosureProcessor extends GridProcessorAdapter
- 职责:处理所有基于闭包(函数式)的远程执行请求
- 常见用途:
compute().run(Runnable)
compute().call(Closure)
compute().broadcast(Closure)
cache().affinity().run(...)
- 它是
ComputeTask
的底层支撑模块
🔩 二、关键字段解析
字段 | 类型 | 作用 |
---|---|---|
pools | PoolProcessor | 线程池管理器,用于获取执行任务的线程池 |
busyLock | GridSpinReadWriteLock | 控制组件在 停止期间不接受新任务 |
stopping | boolean | 标记当前处理器是否正在停止 |
⚠️ 这三个字段共同实现了 “优雅关闭” 的核心逻辑。
🔒 三、busyLock
:优雅关闭的关键机制
1. 什么是 GridSpinReadWriteLock
?
- Ignite 自定义的 自旋读写锁
- 特点:
- 读锁可重入、允许多个线程同时持有
- 写锁独占,用于“停止”阶段
- 使用 自旋 + sleep 避免线程频繁阻塞唤醒
2. 读锁(readLock()
):
- 所有任务提交方法(
runAsync
,callAsync
,broadcast
)都先获取读锁 - 表示:“我正在使用这个处理器”
- 允许多个线程并发提交任务
3. 写锁(tryWriteLock(...)
):
- 在
onKernalStop(...)
中使用 - 目的:阻止任何新任务提交,并标记为“停止中”
🛑 四、onKernalStop(...)
:优雅关闭流程
@Override
public void onKernalStop(boolean cancel) {boolean interrupted = false;while (true) {try {if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))break;elseThread.sleep(200);}catch (InterruptedException ignore) {interrupted = true;}}try {if (interrupted)Thread.currentThread().interrupt();stopping = true; // 标记为停止状态}finally {busyLock.writeUnlock();}
}
🔍 流程详解:
-
尝试获取写锁:
tryWriteLock(200ms)
:尝试在 200ms 内获取写锁- 如果有线程持有读锁(即正在提交任务),则失败
- 失败后
Thread.sleep(200)
,然后重试
-
为什么是“Busy Wait”?
- 注解
@SuppressWarnings("BusyWait")
表示这是有意为之的忙等待 - 目的:尽快完成关闭,避免长时间阻塞
- 每 200ms 尝试一次,不会过度消耗 CPU
- 注解
-
处理中断:
- 如果等待期间被中断,记录
interrupted = true
- 最后恢复中断状态(线程安全最佳实践)
- 如果等待期间被中断,记录
-
设置
stopping = true
- 获取写锁后,设置标志位
- 之后所有
runAsync
等调用都会被拒绝
-
释放写锁
- 即使发生异常,也确保释放锁
✅ 这是一个典型的 “关闭守卫”模式:先阻止新请求,再清理资源。
🚀 五、任务提交方法分析
所有任务提交方法都遵循统一模式:
busyLock.readLock();
try {if (stopping) reject();// 提交任务
} finally {busyLock.readUnlock();
}
我们以 runAsync(...)
为例:
✅ runAsync(...)
:运行一批 Runnable
public ComputeTaskInternalFuture<?> runAsync(...) {assert mode != null;assert !F.isEmpty(jobs);busyLock.readLock(); // 获取读锁try {if (stopping) {return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));}if (F.isEmpty(nodes))return finishedFuture(U.emptyTopologyException());ctx.task().setThreadContext(TC_SUBGRID, nodes);return ctx.task().execute(new T1(mode, jobs), null, sys, execName);}finally {busyLock.readUnlock(); // 释放读锁}
}
关键点:
stopping
检查:如果正在停止,直接返回失败 futurenodes
检查:拓扑为空则返回空拓扑异常ctx.task().execute(...)
:交给TaskProcessor
执行(T1 是一个内部任务类型)- 使用
sys
参数决定使用 系统线程池 还是 公共线程池
✅ callAsync(...)
:远程调用 Closure
public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
- 执行一个带返回值的函数(
Closure<T,R>
) - 返回
ComputeTaskInternalFuture<R>
,可获取结果
✅ broadcast(...)
:广播到所有节点
public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
- 在
nodes
列表中的每个节点上执行job
- 返回一个
Future<Collection<R>>
,包含所有节点的返回值
✅ affinityRun(...)
:基于数据亲和性执行
public ComputeTaskInternalFuture<?> affinityRun(...)
- 关键用途:将任务发送到 特定缓存分区(partition)的主节点
- 流程:
- 获取当前拓扑版本
readyAffinityVersion()
- 使用
ctx.affinity().mapPartitionToNode(...)
找到负责该分区的节点 - 只在那个节点上执行任务
- 获取当前拓扑版本
- 优势:本地化执行,避免数据移动,性能极高
💡 这是 Ignite 实现“移动计算而非数据”的核心机制之一。
🧩 六、T1
, T8
, T11
, T4
是什么?
这些是 内部任务类(定义在 GridTaskInternalFuture
或内部类中),用于包装用户任务:
任务类 | 包装的任务类型 |
---|---|
T1 | GridClosureCallMode + Collection<Runnable> |
T8 | IgniteClosure<T,R> |
T11 | Broadcast 任务 |
T4 | Affinity 任务 |
它们都继承自 ComputeTaskAdapter
,由 TaskProcessor
调度执行。
🎯 七、整体架构图(简化)
+---------------------+
| User Code |
| compute().run(...) |
+----------+----------+|v
+---------------------+
| GridClosureProcessor|
| - busyLock |
| - stopping |
+----------+----------+|v
+---------------------+
| TaskProcessor |
| execute(Task) |
+----------+----------+|v
+---------------------+
| PoolProcessor |
| 系统/公共线程池 |
+---------------------+
✅ 八、设计亮点总结
特性 | 说明 |
---|---|
读写锁控制关闭 | 读锁允许多任务并发提交,写锁确保关闭时原子性 |
优雅拒绝新任务 | stopping 标志 + finishedFuture 快速失败 |
支持多种执行模式 | 单节点、广播、亲和性执行 |
与 Task 子系统集成 | 复用 TaskProcessor 的调度能力 |
线程安全 | 所有提交路径都受锁保护 |
可观测性 | 调试日志、异常信息清晰 |
📌 九、一句话总结
GridClosureProcessor
是 Ignite 的 分布式任务调度入口,它通过 读写锁机制 实现了 高并发提交 + 优雅关闭,并支持 普通执行、广播、数据亲和性执行 等多种模式,是Compute
子系统的核心引擎。
💡 十、你可以借鉴的设计模式
1. 关闭守卫模式(Shutdown Guard)
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;public void submit(Runnable task) {shutdownLock.readLock().lock();try {if (shuttingDown) throw new RejectedExecutionException();// 执行任务} finally {shutdownLock.readLock().unlock();}
}public void shutdown() {shutdownLock.writeLock().lock();try {shuttingDown = true;} finally {shutdownLock.writeLock().unlock();}
}
2. 快速失败(Fail-Fast)
- 不让任务进入队列,而是在入口就拒绝
- 返回一个“已完成的失败 Future”,避免资源浪费
🏁 结语
GridClosureProcessor
虽然代码量不大,但它体现了分布式系统中 资源管理、并发控制、生命周期管理 的最佳实践。理解它,有助于你设计自己的 高可用、可扩展的任务调度系统。