Apache RocketMQ 是一种分布式消息队列系统,支持分布式事务消息,以确保在分布式系统中数据的一致性。它通过一种基于两阶段提交(2PC)的机制结合补偿逻辑来实现分布式事务的最终一致性。以下是对 RocketMQ 分布式事务的详细讲解,包括其核心概念、工作原理、流程、实现机制及注意事项。


一、分布式事务背景与问题

在分布式系统中,事务的执行往往涉及多个服务或数据库。例如,在电商场景中,用户下单可能需要同时更新订单状态、扣减库存、增加积分等操作,这些操作分布在不同的微服务和数据库中。由于网络延迟、服务宕机或事务回滚等原因,很难保证所有操作的原子性和一致性。RocketMQ 的事务消息机制通过将消息发送与本地事务绑定,解决了本地事务执行与消息发送的原子性问题,从而实现分布式系统的最终一致性。

关键问题

  • 如果先发送消息后执行本地事务,可能因本地事务失败导致数据不一致。
  • 如果先执行本地事务后发送消息,可能因服务宕机导致消息未发送。
  • 分布式系统中需要一种机制来确保消息发送和本地事务的原子性。

RocketMQ 的事务消息通过半消息(Half Message)事务状态检查机制解决了上述问题。


二、RocketMQ 分布式事务的核心概念

  1. 事务消息(Transactional Message)

    • RocketMQ 提供的一种高级消息类型,用于确保消息发送和本地事务的原子性。
    • 目标是实现分布式系统的最终一致性,即消息的生产和本地事务要么都成功,要么都不执行。
  2. 半消息(Half Message)

    • 半消息是指生产者发送到 RocketMQ Broker 的消息,初始状态下对消费者不可见。
    • 只有在事务提交(Commit)后,半消息才会变成正常消息,供消费者消费;如果事务回滚(Rollback),半消息会被丢弃。
  3. 事务状态检查(Message Checkback)

    • RocketMQ Broker 会定期检查未确定状态(Pending)的半消息,向生产者发起回调,查询本地事务的状态,以决定是提交(Commit)还是回滚(Rollback)。
  4. 两阶段提交(2PC)

    • RocketMQ 的事务消息基于 2PC 思想:
      • 第一阶段:发送半消息,标记为“暂时不可投递”。
      • 第二阶段:根据本地事务的执行结果,提交(Commit)或回滚(Rollback)消息。
  5. 操作消息(Op Message)

    • RocketMQ 使用 Op 消息来记录半消息的最终状态(Commit 或 Rollback)。
    • Op 消息用于标识事务消息是否已确定状态,避免重复处理。

三、RocketMQ 事务消息的工作原理与流程

RocketMQ 事务消息的工作流程可以分为正常消息发送与提交事务补偿两个部分。以下是详细的流程:

1. 正常事务消息发送与提交流程
  1. 生产者发送半消息

    • 生产者通过 TransactionMQProducer 发送一个事务消息(半消息)到 RocketMQ Broker。
    • Broker 接收到半消息后,将其存储在事务存储系统中,但不生成消息索引,因此对消费者不可见。
    • Broker 返回一个确认(ACK)给生产者,表示半消息已接收。
  2. 生产者执行本地事务

    • 生产者在发送半消息成功后,执行本地事务(如数据库操作)。
    • 本地事务的结果可能是成功(Commit)或失败(Rollback)。
  3. 生产者提交事务状态

    • 根据本地事务的结果,生产者向 Broker 发送第二次确认(ACK),通知事务状态:
      • Commit:Broker 将半消息标记为可投递,生成消息索引,消费者可以消费该消息。
      • Rollback:Broker 丢弃半消息,消费者不会看到该消息。
    • 如果是 Commit,Broker 会记录一个 Op 消息,标记该半消息已提交。
  4. 消费者消费消息

    • 如果半消息被提交,消费者可以从 Broker 获取并处理消息。
    • 如果半消息被回滚,消费者不会收到消息。
2. 事务补偿流程

如果由于网络中断或生产者宕机,导致 Broker 未收到第二次 ACK(事务状态),Broker 会启动事务状态检查机制:

  1. Broker 定期检查

    • Broker 每隔一段时间(如默认 60 秒)检查未确定状态的半消息。
    • Broker 向生产者发送回调请求,查询对应半消息的本地事务状态。
  2. 生产者实现回调接口

    • 生产者需要实现 TransactionListener 接口的 checkLocalTransaction 方法,用于响应 Broker 的状态查询。
    • 在该方法中,生产者检查本地事务的状态(如查询数据库),返回 Commit、Rollback 或 Unknown。
  3. Broker 处理回调结果

    • 如果返回 Commit,Broker 标记半消息为可投递。
    • 如果返回 Rollback,Broker 丢弃半消息。
    • 如果返回 Unknown 或无响应,Broker 会在下一次检查时继续查询,直到达到最大检查次数(默认 15 次)或超时,之后可能丢弃消息。
流程图

以下是 RocketMQ 事务消息的流程图:

生产者                       Broker                       消费者|                            |                            || 1. 发送半消息            |                            ||------------------------->| 2. 存储半消息(不可见)    ||                          |------------------------->|| 3. 收到ACK               |                            ||<-------------------------|                            || 4. 执行本地事务          |                            ||                          |                            || 5. 发送Commit/Rollback   |                            ||------------------------->| 6. 更新消息状态            ||                          |   - Commit: 生成索引       ||                          |   - Rollback: 丢弃消息     ||                          |------------------------->||                          | 7. 消费者拉取消息          ||                          |<-------------------------|
3. 事务补偿流程图
Broker                       生产者|                            || 1. 检查未确定状态的半消息  ||------------------------->|| 2. 查询本地事务状态       ||<-------------------------|| 3. 根据状态更新消息       ||   - Commit: 生成索引      ||   - Rollback: 丢弃消息    |

四、RocketMQ 事务消息的实现机制

  1. 半消息的存储与不可见性

    • RocketMQ 通过修改消息的 Topic 和 Queue 属性来实现半消息的不可见性。
    • 半消息存储在特殊的 Topic(如 RMQ_SYS_TRANS_OP_HALF_TOPIC)中,消费者无法直接访问。
    • 在提交(Commit)时,Broker 将消息的 Topic 和 Queue 恢复为原始值,并生成索引,使其对消费者可见。
  2. Op 消息的引入

    • RocketMQ 引入 Op 消息来标记半消息的最终状态(Commit 或 Rollback)。
    • Op 消息存储在 Broker 的独立队列中,用于记录事务消息的状态。
    • 如果半消息没有对应的 Op 消息,说明事务状态未确定,Broker 会触发状态检查。
  3. 事务状态检查的实现

    • Broker 维护一个事务消息检查定时任务,默认每 60 秒检查一次未确定状态的半消息。
    • 检查时,Broker 通过生产者的 Group ID 找到对应的生产者实例,调用其 checkLocalTransaction 方法。
    • 生产者需要实现该方法,返回事务状态。
  4. 异步刷盘的优化

    • RocketMQ 默认使用异步刷盘(Async Flush)来提高性能,但可能导致半消息未及时落盘。
    • 在高吞吐量场景中,RocketMQ 5.0 引入了批量 Op 消息优化,多个半消息可对应一个 Op 消息,减少写放大问题。

五、代码示例

以下是一个简单的 Java 代码示例,展示如何使用 RocketMQ 的事务消息:

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;public class TransactionProducer {public static void main(String[] args) throws Exception {// 初始化事务消息生产者TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 模拟数据库操作System.out.println("Executing local transaction for message: " + msg);// 假设事务成功return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {// 事务失败return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 事务状态检查// 检查本地事务状态(如查询数据库)System.out.println("Checking transaction status for message: " + msg);return LocalTransactionState.COMMIT_MESSAGE; // 或 ROLLBACK_MESSAGE}});// 启动生产者producer.start();// 发送事务消息Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println("Send result: " + sendResult);// 关闭生产者producer.shutdown();}
}

六、事务消息的优缺点

优点
  • 原子性保证:确保本地事务和消息发送的原子性,解决了分布式事务中的一致性问题。
  • 最终一致性:通过事务状态检查机制,保障消息的可靠投递。
  • 高性能:RocketMQ 的事务消息机制基于异步刷盘和高可用架构,适合高并发场景。
  • 易用性:生产者只需实现 TransactionListener 接口,简化分布式事务开发。
缺点
  • 复杂性:需要实现事务状态检查逻辑,增加了开发复杂度。
  • 性能开销:事务消息的两次提交和状态检查会增加一定的性能开销。
  • 最终一致性:不保证强一致性,仅提供最终一致性,适合对实时性要求不高的场景。
  • 局限性:消费者端的事务一致性需自行处理(如通过重试机制)。

七、应用场景

RocketMQ 的事务消息广泛应用于需要分布式事务的场景,例如:

  • 电商系统:用户下单后,订单系统更新订单状态并发送消息通知库存、积分、物流系统。
  • 金融系统:转账操作需要同时扣款和通知目标账户,确保一致性。
  • 微服务架构:在多个微服务之间通过消息传递实现异步协作。

示例:在电商场景中,用户支付订单后:

  1. 订单服务发送半消息到 RocketMQ,通知积分服务增加积分。
  2. 订单服务执行本地数据库更新(如订单状态从“未支付”改为“已支付”)。
  3. 如果数据库更新成功,提交消息;否则,回滚消息。
  4. 积分服务消费消息,更新用户积分。

八、注意事项

  1. 事务消息的隔离性

    • 事务消息不保证隔离性,消费者可能需要处理重复消息(通过幂等性设计)。
  2. Group ID 的唯一性

    • 事务消息的 Group ID 不能与其他类型的消息共享,Broker 通过 Group ID 定位生产者进行状态检查。
  3. 超时与重试

    • 配置合理的检查间隔(默认 60 秒)和最大检查次数(默认 15 次)。
    • 过多的检查可能增加 Broker 负载,过少可能导致消息丢失。
  4. 本地事务的幂等性

    • 确保本地事务的 checkLocalTransaction 方法具有幂等性,以应对重复检查。
  5. 高可用性

    • RocketMQ 支持主从复制和 Raft 协议(如 DLedger),确保事务消息在 Broker 故障时的高可用性。

九、与其他分布式事务方案的对比

方案描述优点缺点
2PC基于 XA 协议的同步两阶段提交,事务管理器协调所有参与者的提交或回滚。强一致性高延迟,阻塞式,单点故障风险
3PC2PC 的改进,增加预提交阶段以减少阻塞时间。减少阻塞时间复杂性高,性能开销大
TCC应用层事务,Try-Confirm-Cancel 模式,需手动实现补偿逻辑。灵活性高,适合复杂业务开发复杂,需手动实现补偿逻辑
RocketMQ 事务消息基于消息队列的异步事务,结合 2PC 和补偿逻辑实现最终一致性。异步高性能,易于微服务集成最终一致性,非强一致性
Saga将事务拆分为多个本地事务,通过事件驱动执行后续操作。高吞吐量,易扩展复杂补偿逻辑,需处理回滚失败

RocketMQ 事务消息的优势

  • 相比 2PC/3PC,RocketMQ 事务消息异步执行,性能更高,适合高并发场景。
  • 相比 TCC,RocketMQ 的事务消息机制更简单,无需手动实现 Confirm/Cancel 逻辑。
  • 相比 Saga,RocketMQ 的事务消息通过 Broker 的状态检查机制,减少了补偿逻辑的开发量。

十、总结

RocketMQ 的分布式事务消息通过两阶段提交和事务状态检查机制,有效解决了分布式系统中本地事务与消息发送的原子性问题。其核心在于半消息的不可见性和 Broker 的事务状态检查,确保消息的可靠投递和最终一致性。事务消息适用于电商、金融、微服务等场景,能够简化分布式事务的开发复杂度,同时提供高性能和高可用性。

关键点

  • 使用半消息和 Op 消息实现事务的原子性。
  • 通过定期检查未确定状态的半消息,确保事务的最终一致性。
  • 适合需要异步处理和最终一致性的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91167.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91167.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/91167.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

具身智能 自动驾驶相关岗位的技术栈与能力地图

一、硬技能技术栈&#xff08;优先级排序&#xff09; 1. 核心领域技术&#xff08;★★★★★&#xff09;技术方向具体技能学习建议大模型实战- VLA架构&#xff08;RT-2、PaLM-E&#xff09;开发/微调- 多模态对齐&#xff08;CLIP、Flamingo&#xff09;- 生成式策略&#…

实现了加载 正向 碰撞 雅可比 仿真

""" # 此示例从 URDF 文件中加载一个 UR10 机械臂的模型 # 随后演示 Pinocchio 库的基本功能,如正向运动学计算 # 雅可比矩阵计算、碰撞检测以及动力学仿真 """ # 导入 meshcat 的几何模块,用于创建和管理可视化的几何对象 import meshcat.geo…

【0基础PS】PS工具详解--画笔工具

目录前言一、画笔工具的位置与快捷键​二、画笔工具选项栏设置​三、画笔工具的进阶应用​四、常见问题及解决方法​总结前言 在 Photoshop 的众多工具中&#xff0c;画笔工具无疑是极具创造力和实用性的工具之一。无论是进行图像绘制、照片修饰&#xff0c;还是特效制作&…

window10和ubuntu22.04双系统之卸载ubuntu系统

window10和ubuntu22.04双系统之卸载ubuntu系统&#xff09;1. 删除Ubuntu系统占用的磁盘分区&#xff08;在Windows下操作&#xff09;2. 删除ubuntu开机引导项1. winr出来终端提示框后输入2. 然后会在命令行中显示电脑的硬盘列表&#xff0c;输入命令选择安装Windows的那个硬盘…

(C++)C++类和类的方法(基础教程)(与Python类的区别)

前言&#xff1a; 本篇博客建议搭配&#xff1a;&#xff08;Python&#xff09;类和类的方法&#xff08;基础教程介绍&#xff09;&#xff08;Python基础教程&#xff09;-CSDN博客 一起学习使用&#xff1b; 源代码&#xff1a; #include <iostream> #include &…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 微博文章数据可视化分析-文章分类下拉框实现

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解微博文章数据可视化分析-文章分类下拉框实现…

Git命令保姆级教程

Git 入门网站 https://learngitbranching.js.org/?localezh_CN Git 命令 git init // 在本地目录内部会生成.git文件夹 git initgit clone // 从git服务器拉取代码 // 代码下载完成后在当前文件夹中会有一个 shop 的目录&#xff0c;通过 cd shop 命令进入目录。 git clone ht…

Java Ai For循环 (day07)

循环结构 for&#xff1a;循环语句的作用&#xff1a;可以将一段代码重复的执行很多次for 循环语句格式&#xff1a;执行流程&#xff1a; 初始化语句执行条件判断语句&#xff0c;看结果是 true&#xff0c;还是 false false结束&#xff0c;true继续执行循环体语句执行条件控…

Directory Opus 使用优化

自定义快捷键 Directory Opus 移动标签到另一栏 设置快捷键&#xff1a;ctrl←/→ 设置步骤&#xff1a; 打开【设置】—>选择【自定义工具栏和快捷键】 选择【新建】—>【新建窗口快捷键】 输入快捷键命令 Go TABMOVEother此时可以点击运行进行测试&#xff0c;…

Qt知识点2『Ubuntu24.04.2安装Qt5.12.9各种报错』

问题1&#xff1a;Qt安装完毕后&#xff0c;新建一个最简单的测试程序&#xff0c;但是QtCreator左侧构建的三个按钮呈现灰色&#xff0c;无法进行构建操作答&#xff1a;进入QtCreator的Kits界面&#xff08;工具-选项&#xff09;&#xff0c;点击"自动检测"下的De…

TS面试题

1.TS有哪些类型&#xff08;对比与js&#xff09;&#xff1f;关键字/语法用途示例any关闭类型检查let a: any 4unknown类型安全的 anylet u: unknown 4; if (typeof u number) …never永不存在的值function err(): never { throw 0; }void无返回值function f(): void {}enu…

借助Early Hints和HarperDB改善网页性能

对电商网站来说&#xff0c;糟糕的页面性能可能会增加交易放弃率。一直以来&#xff0c;人们会使用CDN进行缓存从而缩短页面加载时间&#xff0c;但即便实施了强大的缓存&#xff0c;消费者在通过移动网络访问这些网站时可能仍然会需要频繁等待。最近诞生了一种名为“早期提示”…

MEMS陀螺如何成为无人机稳定飞行的核心?

在无人机自主翱翔、灵活机动并适应多变环境的背后&#xff0c;对其运动状态——尤其是姿态——的精确感知是基石。作为飞行控制系统&#xff08;飞控&#xff09;的“内耳”&#xff0c;陀螺仪实时捕捉机体绕X、Y、Z三轴的旋转角速度。这一核心数据是飞控进行姿态解算和维持飞行…

腾讯云拉取docker镜像失败怎么办

ps:我直接按照步骤1和2就解决了 以下内容来自豆包 在腾讯云服务器上拉取 Docker 镜像失败&#xff0c;可以按照以下步骤排查和解决&#xff1a; 一、检查网络连接 确认服务器网络正常 bash ping www.baidu.com # 测试公网连通性如果无法 ping 通&#xff0c;检查服务器防火墙…

Apache FOP实践——pdf模板引擎

文章目录 基本概念设计思想具体实践完整应用 基本概念 Apache FOP&#xff08;Formatting Objects Processor&#xff09;是一个基于Java的开源工具&#xff0c;用于将 XSL-FO&#xff08;XSL Formatting Objects&#xff09; 文档转换为PDF、图像等格式。 设计思想 将内容&…

WebRTC核心组件技术解析:架构、作用与协同机制

引言&#xff1a;WebRTC的技术定位与价值 WebRTC&#xff08;Web Real-Time Communication&#xff09;作为一项开源实时通信标准&#xff0c;已成为浏览器原生音视频交互、P2P数据传输的技术基石。自2011年开源以来&#xff0c;其标准化进程由W3C&#xff08;API层&#xff0…

OmniParser:提升工作效率的视觉界面解析工具

OmniParser&#xff1a;基于视觉的用户界面解析工具在现代软件开发中&#xff0c;用户界面的自动化处理变得愈发重要。OmniParser 是一个强大的工具&#xff0c;旨在将用户界面的截图解析为结构化的、易于理解的元素&#xff0c;从而显著提升了大型语言模型&#xff08;如GPT-4…

C#程序员计算器

使用C#语言编写程序员计算器&#xff0c;使其能够进行加减乘除和与或非等逻辑运算。 calculator.cs 代码如下 using System; using System.Numerics; using System.Globalization;namespace Calculator1 {public enum CalcBase { Bin 2, Oct 8, Dec 10, Hex 16 }public en…

国产音频DA转换芯片DP7361支持192K六通道24位DA转换器

产品概述 DP7361 是一款立体声六通道线性输出的数模转换器&#xff0c;内含插值滤波器、Multi-Bit 数模转换 器、模拟输出滤波器&#xff0c;支持主流的音频数据格式。 DP7361 片上集成线性低通模拟滤波器和四阶 Multi-Bit Δ-∑调制器&#xff0c;能自动检测信号频率和主时钟频…

【C51单片机四个按键控制流水灯】2022-9-30

缘由C51&#xff0c;四个按键控制流水灯-嵌入式-CSDN问答 #include "REG52.h" sbit k1P3^0; sbit k2P3^1; sbit k3P3^2; sbit k4P3^3; unsigned char code lsd[]{127,191,223,239,247,251,253,254};//跑马灯 void jsys(unsigned char y,unsigned char s){unsigned c…