这是一个 Apache Ignite 中非常核心的组件 —— GridClosureProcessor,它是 分布式闭包(Closure)执行的调度中枢,负责在集群节点上异步执行用户提交的任务(如 RunnableClosure)。

我们来逐层深入理解它的设计思想、关键机制和代码逻辑。


🧱 一、类概览:GridClosureProcessor

public class GridClosureProcessor extends GridProcessorAdapter
  • 职责:处理所有基于闭包(函数式)的远程执行请求
  • 常见用途:
    • compute().run(Runnable)
    • compute().call(Closure)
    • compute().broadcast(Closure)
    • cache().affinity().run(...)
  • 它是 ComputeTask 的底层支撑模块

🔩 二、关键字段解析

字段类型作用
poolsPoolProcessor线程池管理器,用于获取执行任务的线程池
busyLockGridSpinReadWriteLock控制组件在 停止期间不接受新任务
stoppingboolean标记当前处理器是否正在停止

⚠️ 这三个字段共同实现了 “优雅关闭” 的核心逻辑。


🔒 三、busyLock:优雅关闭的关键机制

1. 什么是 GridSpinReadWriteLock

  • Ignite 自定义的 自旋读写锁
  • 特点:
    • 读锁可重入、允许多个线程同时持有
    • 写锁独占,用于“停止”阶段
    • 使用 自旋 + sleep 避免线程频繁阻塞唤醒

2. 读锁(readLock()):

  • 所有任务提交方法(runAsync, callAsync, broadcast)都先获取读锁
  • 表示:“我正在使用这个处理器”
  • 允许多个线程并发提交任务

3. 写锁(tryWriteLock(...)):

  • onKernalStop(...) 中使用
  • 目的:阻止任何新任务提交,并标记为“停止中”

🛑 四、onKernalStop(...):优雅关闭流程

@Override
public void onKernalStop(boolean cancel) {boolean interrupted = false;while (true) {try {if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))break;elseThread.sleep(200);}catch (InterruptedException ignore) {interrupted = true;}}try {if (interrupted)Thread.currentThread().interrupt();stopping = true; // 标记为停止状态}finally {busyLock.writeUnlock();}
}

🔍 流程详解:

  1. 尝试获取写锁

    • tryWriteLock(200ms):尝试在 200ms 内获取写锁
    • 如果有线程持有读锁(即正在提交任务),则失败
    • 失败后 Thread.sleep(200),然后重试
  2. 为什么是“Busy Wait”?

    • 注解 @SuppressWarnings("BusyWait") 表示这是有意为之的忙等待
    • 目的:尽快完成关闭,避免长时间阻塞
    • 每 200ms 尝试一次,不会过度消耗 CPU
  3. 处理中断

    • 如果等待期间被中断,记录 interrupted = true
    • 最后恢复中断状态(线程安全最佳实践)
  4. 设置 stopping = true

    • 获取写锁后,设置标志位
    • 之后所有 runAsync 等调用都会被拒绝
  5. 释放写锁

    • 即使发生异常,也确保释放锁

✅ 这是一个典型的 “关闭守卫”模式:先阻止新请求,再清理资源。


🚀 五、任务提交方法分析

所有任务提交方法都遵循统一模式:

busyLock.readLock();
try {if (stopping) reject();// 提交任务
} finally {busyLock.readUnlock();
}

我们以 runAsync(...) 为例:

runAsync(...):运行一批 Runnable

public ComputeTaskInternalFuture<?> runAsync(...) {assert mode != null;assert !F.isEmpty(jobs);busyLock.readLock(); // 获取读锁try {if (stopping) {return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));}if (F.isEmpty(nodes))return finishedFuture(U.emptyTopologyException());ctx.task().setThreadContext(TC_SUBGRID, nodes);return ctx.task().execute(new T1(mode, jobs), null, sys, execName);}finally {busyLock.readUnlock(); // 释放读锁}
}
关键点:
  • stopping 检查:如果正在停止,直接返回失败 future
  • nodes 检查:拓扑为空则返回空拓扑异常
  • ctx.task().execute(...):交给 TaskProcessor 执行(T1 是一个内部任务类型)
  • 使用 sys 参数决定使用 系统线程池 还是 公共线程池

callAsync(...):远程调用 Closure

public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
  • 执行一个带返回值的函数(Closure<T,R>
  • 返回 ComputeTaskInternalFuture<R>,可获取结果

broadcast(...):广播到所有节点

public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
  • nodes 列表中的每个节点上执行 job
  • 返回一个 Future<Collection<R>>,包含所有节点的返回值

affinityRun(...):基于数据亲和性执行

public ComputeTaskInternalFuture<?> affinityRun(...)
  • 关键用途:将任务发送到 特定缓存分区(partition)的主节点
  • 流程:
    1. 获取当前拓扑版本 readyAffinityVersion()
    2. 使用 ctx.affinity().mapPartitionToNode(...) 找到负责该分区的节点
    3. 只在那个节点上执行任务
  • 优势:本地化执行,避免数据移动,性能极高

💡 这是 Ignite 实现“移动计算而非数据”的核心机制之一。


🧩 六、T1, T8, T11, T4 是什么?

这些是 内部任务类(定义在 GridTaskInternalFuture 或内部类中),用于包装用户任务:

任务类包装的任务类型
T1GridClosureCallMode + Collection<Runnable>
T8IgniteClosure<T,R>
T11Broadcast 任务
T4Affinity 任务

它们都继承自 ComputeTaskAdapter,由 TaskProcessor 调度执行。


🎯 七、整体架构图(简化)

+---------------------+
|  User Code          |
|  compute().run(...) |
+----------+----------+|v
+---------------------+
| GridClosureProcessor|
| - busyLock          |
| - stopping          |
+----------+----------+|v
+---------------------+
|  TaskProcessor      |
|  execute(Task)      |
+----------+----------+|v
+---------------------+
|  PoolProcessor      |
|  系统/公共线程池     |
+---------------------+

✅ 八、设计亮点总结

特性说明
读写锁控制关闭读锁允许多任务并发提交,写锁确保关闭时原子性
优雅拒绝新任务stopping 标志 + finishedFuture 快速失败
支持多种执行模式单节点、广播、亲和性执行
与 Task 子系统集成复用 TaskProcessor 的调度能力
线程安全所有提交路径都受锁保护
可观测性调试日志、异常信息清晰

📌 九、一句话总结

GridClosureProcessor 是 Ignite 的 分布式任务调度入口,它通过 读写锁机制 实现了 高并发提交 + 优雅关闭,并支持 普通执行、广播、数据亲和性执行 等多种模式,是 Compute 子系统的核心引擎。


💡 十、你可以借鉴的设计模式

1. 关闭守卫模式(Shutdown Guard)

private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;public void submit(Runnable task) {shutdownLock.readLock().lock();try {if (shuttingDown) throw new RejectedExecutionException();// 执行任务} finally {shutdownLock.readLock().unlock();}
}public void shutdown() {shutdownLock.writeLock().lock();try {shuttingDown = true;} finally {shutdownLock.writeLock().unlock();}
}

2. 快速失败(Fail-Fast)

  • 不让任务进入队列,而是在入口就拒绝
  • 返回一个“已完成的失败 Future”,避免资源浪费

🏁 结语

GridClosureProcessor 虽然代码量不大,但它体现了分布式系统中 资源管理、并发控制、生命周期管理 的最佳实践。理解它,有助于你设计自己的 高可用、可扩展的任务调度系统

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/92661.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/92661.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/92661.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

for循环详解与实战技巧

目录 一、for循环语法 二、for循环执行流程 流程图表示&#xff1a; 三、for循环实践示例 示例&#xff1a;在屏幕上打印1~10的值 四、while循环与for循环对比 for循环和while循环都包含三个关键部分&#xff1a; 两者的主要区别在于代码组织方式&#xff1a; 五、练习…

winform中的listbox实现拖拽功能

文章目录前言一、实现前言 winform中的listBox实现拖拽&#xff01; 一、实现 winform中的listbox实现拖拽只需要实现四个事件 1、准备两个listbox控件 其中listtarget&#xff0c;AllowDrop属性设置为True。 2、实现四个事件 2.1MouseDown //在 MouseDown 事件期间&#x…

用 Docker 安装并启动 Redis:从入门到实战

用 Docker 安装并启动 Redis&#xff1a;从入门到实战Redis 作为一款高性能的键值对数据库&#xff0c;在缓存、会话存储、消息队列等场景中被广泛应用。本文将详细介绍如何使用 Docker 快速安装和启动 Redis&#xff0c;包括基础配置、数据持久化以及容器管理等核心操作&#…

ansible学习第一天

一&#xff1a;ansible基础知识1.1 ansible的定义与工作原理简述ansible是一个自动化运维工具&#xff0c;用于执行自动化任务&#xff0c;包括像配置管理&#xff0c;应用部署&#xff0c;任务执行等等&#xff0c;本质上来说也是基础设施及代码工具&#xff0c;通过可读性较强…

Vue原理与高级开发技巧详解

Vue 的底层原理、高级用法、性能优化和生态整合 文章目录Vue 的底层原理、高级用法、性能优化和生态整合一、Vue 双向绑定原理深度剖析1. Vue 2 实现原理&#xff08;Object.defineProperty&#xff09;2. Vue 3 实现原理&#xff08;Proxy&#xff09;3. v-model 高级用法二、…

axios的封装

axios的封装 在src目录下新建文件夹utils工具类&#xff0c;文件夹里面新建http.js文件&#xff0c;如果项目涉及到多个基地址可以新建http2.js文件。 import axios from axios;/*** 后端*/// 创建axios实例 const http axios.create({// 1.接口基地址baseURL: http://192.168…

MariaDB 数据库管理与web服务器

MariaDB 数据库管理与WEB 服务器 介绍 MariaDB 数据库介绍 **数据库&#xff0c;是一个存放计算机数据的仓库。**这个仓库是按照一定的数据结构来对数据进行组织和存储的&#xff0c;我们可以通过数据库提供的多种方法来管理其中的数据。 数据结构&#xff0c;是指数据的组织形…

分治-归并-912.排序数组-力扣(LeetCode)

一、题目解析1、将数组排升序2、在不使用任何内置函数的情况下解决问题二、算法原理分治-归并合并两个有序数组1、双指针遍历两个合并数组2、将比较后的较小值放到新开数组中3、防止有指针未遍历完&#xff0c;特殊处理4、将nums中的元素还原三、代码示例vector<int> tmp…

网络安全初学者学习心得

看到你对网络安全学习的兴趣&#xff0c;我感到非常振奋&#xff01;这个领域既充满挑战又回报丰厚&#xff0c;作为初学者&#xff0c;理清学习内容和方向确实至关重要。下面我将结合多年的行业观察和指导经验&#xff0c;为你详细拆解网络安全初学者的学习内容并分享一些核心…

防火墙笔记优化版

一、防火墙的核心定义防火墙是一种基于预设安全策略&#xff0c;用于隔离内网与外网、控制网络流量的安全系统&#xff08;可分为软件系统或硬件系统&#xff09;。其核心作用包括&#xff1a;流量隔离&#xff1a;物理或逻辑分隔内网、外网及 DMZ 区域&#xff08;DMZ 为内网与…

vue3前端项目cursor rule

cursor rule是什么&#xff0c;以及怎么定义&#xff0c;看这个文章&#xff1a; cursor中定义cursor rules_cursor rules如何编写-CSDN博客 针对现有一个vue3的前端项目&#xff0c;写了一份cursor rule&#xff0c;可以作为参考&#xff0c;内容如下&#xff08;仅作为参考&…

基于51单片机红外遥控定时开关智能家电插座设计

1. 功能介绍 本设计是一款基于 STC8C52 单片机 的智能家电插座系统&#xff0c;集 红外遥控控制、定时开关控制、自动与手动模式切换、掉电数据保存、液晶显示、蜂鸣器提示 于一体&#xff0c;能够方便用户对家用电器进行精准的定时控制与远程操作。系统广泛适用于家用电器、办…

下一代防火墙组网方案

知识回顾&#xff1a;1.传统防火墙包括包过滤防火墙、应用网关防火墙、状态检测防火墙。2.包过滤防火墙工作在3、4层。3.包过滤防火墙特点&#xff1a;4.应用网关防火墙主要作用&#xff1a;①截取用户初始化连接请求&#xff0c;对用户进行认证&#xff1b;②通过ALG能让多通道…

WEB开发-第二十七天(PHP篇)

DW PHPStorm PhpStudy Navicat Premium DW : HTML&JS&CSS开发 PHPStorm : 专业PHP开发IDE PhpStudy &#xff1a;Apache MYSQL环境 Navicat Premium: 全能数据库管理工 变量覆盖安全&#xff1a; $GLOBALS&#xff1a;这种全局变量用于在PHP脚本中的任意位置访…

Lwip深度阅读-网络架构

LWIP网络协议栈详细介绍 本文的内容基本基于野火的LWIP手册&#xff0c;和LWIP源码撰写。 网络协议栈概述 从图片可以看出&#xff0c;网络协议栈采用分层架构&#xff0c;每一层都有特定的功能和协议。 TCP/IP协议分层模型数据封装过程MAC数据包 我使用wireShark抓包的时候&am…

嵌入式系统学习Day16(C语言中的位运算)

位运算二进制位的运算嵌入式:通过位运算 控制 硬件运算: 运算规则 & 与 一假则假 | 或 一真则真 ~ 非 真假相对 ^ 异或 相同为假 不同为真 << 左移 表示二进制位的移动 >> 右移 eg:int a 0x55; int b 0x33;0101 0101 //0x55 &am…

Endnote下载,导入曼大 harvard_manchester格式

下载endnote 并激活中国农业科技文献与信息服务平台&#xff0c;点击下载 下载harvard_manchester 格式 Harvard Manchester - Referencing guide at the University of Manchester - Subject guides at University of Manchester 双击打开第二步下载的安装包&#xff08;使用…

【Docker进阶实战】从多容器编排到集群部署

Docker进阶实战&#xff1a;从多容器编排到集群部署 当你已经熟悉Docker的基本操作后&#xff0c;面对的下一个挑战往往是&#xff1a;如何管理多个容器的协作&#xff1f;如何实现容器的集群化部署与扩展&#xff1f;如何保证服务的高可用&#xff1f; 一、Docker Compose&…

【Linux文件操作】文件操作系统调用

目录 一、文件创建&#xff08;creat 系统调用&#xff09;​ 1.1 函数原型 1.2 参数说明​ 1.3 返回值​ 1.4 使用示例 二、文件打开&#xff08;open 系统调用&#xff09;​ 2.1 函数原型 2.2 参数说明​ 2.3 返回值​ 2.4 使用示例 三、文件读写&#xff08;read 和 write …

FreeRTOS源码分析六:vTaskDelay vs xTaskDelayUntil任务延时

系列文章目录 FreeRTOS源码分析一&#xff1a;task创建&#xff08;RISCV架构&#xff09; FreeRTOS源码分析二&#xff1a;task启动&#xff08;RISCV架构&#xff09; FreeRTOS源码分析三&#xff1a;列表数据结构 FreeRTOS源码分析四&#xff1a;时钟中断处理响应流程 Free…