Apache RocketMQ 是一个高性能、高可靠的分布式消息中间件,广泛应用于异步通信、事件驱动架构和分布式系统中。本文深入探讨 RocketMQ 的消息可靠性、顺序性和幂等处理机制,结合 Redisson 分布式锁实现幂等消费,提供详细的代码示例和实践建议,帮助开发者构建健壮的消息系统。

一、RocketMQ 概述

Apache RocketMQ 由阿里巴巴开源,现为 Apache 顶级项目,支持发布/订阅和点对点消息模型,提供普通消息、定时消息、事务消息等多种类型。其核心组件包括:

  • NameServer:管理 Broker 元数据,提供服务发现和路由。
  • Broker:负责消息存储、转发和持久化。
  • Producer:消息生产者,发送消息到 Broker。
  • Consumer:消息消费者,从 Broker 订阅消息。

RocketMQ 的高性能和灵活性使其成为企业级应用的理想选择,尤其在需要保证消息可靠性、顺序性和幂等性的场景中。以下逐一分析这三方面的实现机制。


二、消息可靠性

消息可靠性确保消息从生产者到消费者的整个流程中不丢失、不重复且正确传递。RocketMQ 从生产者、Broker 和消费者三个层面提供保障。

1. 生产者端可靠性

RocketMQ 支持三种发送模式:

  • 同步发送:等待 Broker 确认,确保消息成功存储。
  • 异步发送:通过回调确认结果,适合高吞吐场景。
  • 单向发送:无确认机制,适用于低可靠性场景(如日志收集)。

生产者内置重试机制(默认重试 2 次),可通过 setRetryTimesWhenSendFailed 配置。

代码示例(同步发送)

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {System.out.println("Message sent successfully: " + sendResult.getMsgId());
}
producer.shutdown();

2. Broker 端可靠性

Broker 通过持久化存储消息到磁盘(commitlog),支持两种刷盘模式:

  • 同步刷盘flushDiskType = SYNC_FLUSH):消息写入磁盘后返回,适合高可靠性场景。
  • 异步刷盘flushDiskType = ASYNC_FLUSH):消息先写入内存,定期刷盘,性能更高但有少量丢失风险。

配置示例

flushDiskType=SYNC_FLUSH

3. 消费者端可靠性

消费者通过 Push 或 Pull 模式消费消息,RocketMQ 提供以下机制:

  • 消息确认:Push 模式下,消费者需显式确认消息处理状态。
  • 消费重试:消费失败时,消息进入重试队列(%RETRY%ConsumerGroup),按时间间隔重试(默认 16 次)。
  • 死信队列:重试失败后,消息进入死信队列(%DLQ%ConsumerGroup),便于人工处理。

代码示例(消费者)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

4. 事务消息

事务消息用于分布式事务场景,确保消息发送与本地事务一致。例如,在电商订单系统中,只有数据库更新成功后,消息才会被提交。

事务消息流程

  1. 发送半消息(Half Message)到 Broker。
  2. 执行本地事务。
  3. 根据事务结果提交或回滚消息。

代码示例

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查事务状态return LocalTransactionState.COMMIT_MESSAGE;}
});
producer.start();
Message msg = new Message("TopicTest", "TagA", "Transaction Message".getBytes());
producer.sendMessageInTransaction(msg, null);

三、消息顺序性

顺序消息确保消息按照发送顺序被消费,适用于订单状态流转、日志处理等场景。RocketMQ 通过分区顺序和单线程消费实现。

1. 顺序消息机制

  • 全局顺序:所有消息发送到一个队列,消费者单线程消费,性能较低。
  • 分区顺序:按业务分区(如订单 ID)将消息发送到不同队列,同一分区的消息保持顺序,性能较高。

RocketMQ 使用 MessageQueueSelector 确保同一业务的消息发送到同一队列,消费者通过 MessageListenerOrderly 实现单线程消费。

2. MessageListenerOrderly 的工作原理

MessageListenerOrderly 通过以下机制保障顺序消费:

  • 队列锁:Broker 为每个消息队列分配锁,确保同一队列只被一个消费者线程处理。
  • 单线程消费:每个队列由单一线程按序处理消息,未完成当前消息前不会拉取下一条。
  • 消费进度管理:只有消息消费成功后,Offset 才会更新。
  • 负载均衡:队列重新分配时,消费者从上次 Offset 继续消费,避免乱序。

代码示例(生产者)

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {String orderId = "order" + (i % 3);Message msg = new Message("OrderTopic", "TagA", orderId, ("Order Step " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {String id = (String) arg;int index = Math.abs(id.hashCode() % mqs.size());return mqs.get(index);}, orderId);
}
producer.shutdown();

代码示例(顺序消费者)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderlyConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Thread: %s, QueueId: %d, Message: %s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()));}try {Thread.sleep(100); // 模拟处理耗时return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}
});
consumer.start();

四、消息幂等处理(基于 Redisson)

幂等性确保重复消费同一消息不会导致状态不一致,例如避免重复扣款。RocketMQ 本身不提供内置幂等机制,但可以通过 Redisson 的分布式锁实现。

1. 幂等处理原理

  • 唯一标识:使用消息的 MessageId 或业务 ID 作为去重依据。
  • 分布式锁:通过 Redisson 获取基于消息 ID 的锁,锁获取成功则处理消息,失败则跳过。
  • 状态记录:可选地将消费状态存入 Redis 或数据库,进一步防止重复消费。
  • 锁的 TTL:设置锁过期时间,避免异常导致锁无法释放。

2. Redisson 配置

配置 Redisson 客户端连接 Redis:

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class RedissonConfig {public static RedissonClient getRedissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return Redisson.create(config);}
}

3. 幂等消费者实现

以下是使用 Redisson 分布式锁的消费者代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;import java.util.List;
import java.util.concurrent.TimeUnit;public class IdempotentConsumer {public static void main(String[] args) throws Exception {RedissonClient redissonClient = RedissonConfig.getRedissonClient();DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IdempotentConsumerGroup");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100); // 模拟业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeConcurrentlyStatus.RECONSUME_LATER;} finally {if (acquired) {lock.unlock();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

4. 结合顺序消费的幂等处理

对于顺序消费场景,使用 MessageListenerOrderly 实现幂等处理:

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String msgId = msg.getMsgId();String lockKey = "rocketmq:msg:" + msgId;RLock lock = redissonClient.getLock(lockKey);boolean acquired = false;try {acquired = lock.tryLock(1, 10, TimeUnit.SECONDS);if (acquired) {System.out.println("Processing message: " + new String(msg.getBody()) + ", MsgId: " + msgId);Thread.sleep(100);return ConsumeOrderlyStatus.SUCCESS;} else {System.out.println("Duplicate message skipped: " + msgId);return ConsumeOrderlyStatus.SUCCESS;}} catch (Exception e) {System.err.println("Error processing message: " + msgId + ", error: " + e.getMessage());return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;} finally {if (acquired) {lock.unlock();}}}return ConsumeOrderlyStatus.SUCCESS;}
});

五、应用场景与注意事项

1. 应用场景

  • 消息可靠性:电商订单、支付通知,确保消息不丢失。
  • 消息顺序性:订单状态流转(创建 -> 支付 -> 发货),保证处理顺序。
  • 消息幂等性:支付扣款、库存更新,防止重复处理。

2. 注意事项

  • 可靠性
    • 使用同步刷盘和事务消息确保高可靠性场景。
    • 配置合理的重试次数和死信队列处理失败消息。
  • 顺序性
    • 生产者需确保同一业务消息发送到同一队列。
    • MessageListenerOrderly 牺牲部分性能,适合低吞吐场景。
  • 幂等性
    • 确保 Redis 高可用,避免单点故障。
    • 锁的 TTL 需大于业务处理时间,但不宜过长。
    • 可结合数据库唯一约束作为兜底去重机制。
  • 性能优化
    • 调整队列数量以平衡吞吐量和顺序性。
    • 批量消费时,优化锁粒度或使用 Redisson 的 MultiLock

六、总结

Apache RocketMQ 通过同步发送、刷盘机制和事务消息保证消息可靠性;通过分区顺序和 MessageListenerOrderly 实现消息顺序性;通过 Redisson 分布式锁实现高效的幂等处理。开发者可根据业务需求选择合适的机制:

  • 高可靠性场景:启用同步刷盘和事务消息。
  • 顺序消费场景:使用 MessageQueueSelectorMessageListenerOrderly
  • 幂等性场景:结合 Redisson 分布式锁和状态记录。

通过合理配置和代码实现,RocketMQ 可以满足复杂分布式系统中的消息处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/95141.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/95141.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/95141.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无服务器日志分析由 Elasticsearch 提供支持,推出新的低价层

作者&#xff1a;来自 Elastic Log Analytics Elastic Observability Logs Essentials 在 Elastic Cloud Serverless 上提供成本效益高、无麻烦的日志分析。 SREs 可以摄取、搜索、丰富、分析、存储和处理日志&#xff0c;而无需管理部署的运营开销。[](https://www.elastic.co…

(Arxiv-2025)Phantom-Data:迈向通用的主体一致性视频生成数据集

Phantom-Data&#xff1a;迈向通用的主体一致性视频生成数据集 paper是字节发布在Arxiv2025的工作 paper title&#xff1a;Phantom-Data: Towards a General Subject-Consistent Video Generation Dataset Code&#xff1a;链接 Abstract 近年来&#xff0c;主体到视频&#…

如何解决pip安装报错ModuleNotFoundError: No module named ‘mlflow’问题

【Python系列Bug修复PyCharm控制台pip install报错】如何解决pip安装报错ModuleNotFoundError: No module named ‘mlflow’问题 摘要 在Python开发中&#xff0c;pip install 报错是一种常见问题&#xff0c;尤其是在使用集成开发环境&#xff08;IDE&#xff09;如PyCharm时…

2020/12 JLPT听力原文 问题一 3番

3番&#xff1a;会社で女の人と男の人が話しています。女の人は倉庫に入るとき、どの順番で入口のボタンを押さなければなりませんか。 女&#xff1a;すみません。地下の倉庫に行って、資料を取ってきたいんですが、入口の開け方がわからなくて… 男&#xff1a;ああ、最近、管…

C#/.NET/.NET Core技术前沿周刊 | 第 49 期(2025年8.1-8.10)

前言 C#/.NET/.NET Core技术前沿周刊&#xff0c;你的每周技术指南针&#xff01;记录、追踪C#/.NET/.NET Core领域、生态的每周最新、最实用、最有价值的技术文章、社区动态、优质项目和学习资源等。让你时刻站在技术前沿&#xff0c;助力技术成长与视野拓宽。 欢迎投稿、推荐…

基于强化学习的目标跟踪 研究初探

强化学习 目标跟踪Visual tracking by means of deep reinforcement learning and an expert demonstratorYOLO 检测下基于 ETC-DDPG 算法的无人机视觉跟踪基于特征与深度强化学习方法的机器人视觉伺服技术研究高性能可拓展视频目标跟踪算法研究基于目标运动与外观特征的多目标…

排序与查找,简略版

数组的排序 排序的基本介绍 排序是将一组数据&#xff0c;按照一定顺序进行排列的过程 排序的分类&#xff1a; 内部排序&#xff1a; 一次性适用数据量小的情况 将需要处理的数据都加载到内部存储器中进行排序。包括交换式排序&#xff0c;选择式排序&#xff0c;插入式排序 外…

打靶日常-XSS(反射型和存储型)

目录 小皮: 1. 2.这里需要登录,我们之前爆破出账号密码在这里就可以用​编辑 登录之后:​编辑 使用工具: 先输入正确字符进行测试:aaa 进行测试: 3.换种控制台显示 结果:(使用f12大法) DVWA: 反射型XSS: 低: ​编辑 中:大小写绕过: ​编辑 也可以双写绕过: ​编…

二叉搜索树深度解析:从原理实现到算法应用----《Hello C++ Wrold!》(18)--(C/C++)

文章目录前言二叉搜索树&#xff08;二叉排序树或二叉查找树&#xff09;二叉搜索树的模拟实现二叉搜索树和有序数组二分查找的比较两个搜索模型作业部分前言 二叉搜索树&#xff08;Binary Search Tree&#xff0c;简称 BST&#xff09;作为一种重要的树形数据结构&#xff0…

牛客.空调遥控二分查找牛客.kotori和气球(数学问题)力扣.二叉树的最大路径和牛客.主持人调度(二)

目录 牛客.空调遥控 二分查找 牛客.kotori和气球&#xff08;数学问题) 力扣.二叉树的最大路径和 牛客.主持人调度(二) 牛客.空调遥控 枚举n个空调之后&#xff0c;使数组有序&#xff0c;左右下标&#xff0c;用二分查找&#xff0c;然后一个求 长度就好 二分查找 /二分理…

《嵌入式Linux应用编程(二):标准IO高级操作与文件流定位实战》

今日学习内容1. 行输入函数安全实践(1) fgets vs gets函数安全特性换行符处理缓冲区保护fgets指定读取长度&#xff08;size-1&#xff09;保留\n并添加\0安全&#xff08;防溢出&#xff09;gets无长度限制将\n替换为\0危险2. Linux标准文件流文件流符号设备 标准输入stdin键盘…

Springboot2+vue2+uniapp 小程序端实现搜索联想自动补全功能

目录 一、实现目标 1.1 需求 1.2 实现示例图: 二、实现步骤 2.1 实现方法简述 2.2 简单科普 2.3 实现步骤及代码 一、实现目标 1.1 需求 搜索联想——自动补全 &#xff08;1&#xff09;实现搜索输入框&#xff0c;用户输入时能显示模糊匹配结果 &am…

极简 5 步:Ubuntu+RTX4090 源码编译 vLLM

极简 5 步&#xff1a;UbuntuRTX4090 源码编译 vLLM1. 系统依赖&#xff08;一次性&#xff09;2. 进入源码目录 & 激活环境3. 启用 ccache 自动并行度4. 拉代码 编译&#xff08;2 行搞定&#xff09;5. 更新 flash-attn&#xff08;与 vLLM 配套&#xff09;6. 启动 4 …

生产工具革命:定制开发开源AI智能名片S2B2C商城小程序重构商业生态的范式研究

摘要互联网作为信息工具已深刻改变商业生态&#xff0c;但其本质仍停留在效率优化层面。本文提出&#xff0c;基于定制开发开源AI智能名片与S2B2C商城小程序的深度融合&#xff0c;正在引发生产工具层面的革命性变革。该技术架构通过重构"人-货-场"关系&#xff0c;实…

Transformer前传:Seq2Seq与注意力机制Attention

前言 参考了以下大佬的博客 https://blog.csdn.net/v_july_v/article/details/127411638 https://blog.csdn.net/andy_shenzl/article/details/140146699 https://blog.csdn.net/weixin_42475060/article/details/121101749 https://blog.csdn.net/weixin_43334693/article/det…

企业架构工具篇之ArchiMate的HelloWorld(2)

本文通过ArchiMate做一个员工报销流程设计的小demo,按照步骤都可以做出来,在做这个demo之前先简单认识下Archimate的开发界面: 模型树(Models)窗口:通常位于左上方,以树形结构展示一个或多个 ArchiMate 模型。用户可在此浏览模型的整体结构,快速定位到特定的模型元素,…

Docker 详解(保姆级安装+配置+使用教程)

文章目录一、初识 Docker二、Docker 命令1、安装2、配置镜像加速器检查配置是否生效3、服务相关命令4、镜像相关命令5、容器相关命令三、Docker 容器数据卷1、数据卷概念2、数据卷作用3、配置数据卷4、配置数据卷容器四、Docker 应用部署五、备份与迁移六、Dockerfile七、Docke…

做调度作业提交过程简单介绍一下

✅作业提交与执行流程前文提到在 Linux 的 HPC 或超算环境中&#xff0c;可以只在共享存储上安装一次应用程序&#xff0c;然后所有计算节点通过挂载共享目录来访问和执行这些程序&#xff0c;那么作业提交及执行过程是怎么样的流程呢&#xff1f;结构说明&#xff1a;第一行是…

【Altium designer】解决报错“Access violation at address...“

问题现象如上AD9原理图工程所示报错&#xff0c;当我关闭这个“CMM-WEIER-VA”原理图工程以及其他不相关的原理图工程出现报错&#xff1a;Access violation at address 0832A5EC in module WorkspaceManager.DLL. Read of address 00000061 at 0832A5EC&#xff0c;任务管理器…

小杰python高级(three day)——numpy库

1.numpy数组的操作&#xff08;1&#xff09;数组的连接stack该函数可以实现多个数组的堆叠(连接)&#xff0c;会创建新的轴&#xff0c;用于沿着新的轴连接一系列数组&#xff0c;所有数组必须具有相同的形状。可以增加数组的维度。假设输入的每个数组都是 n 维数组&#xff0…