在分布式系统架构中,消息队列(MQ)作为解耦服务、削峰填谷、异步通信的核心组件,其消息投递的可靠性与延时消息的精准性直接影响业务系统的稳定性。本文结合实际业务场景,详细解析消息投递的全流程设计与延时消息的通用实现方案,提供可落地的代码思路,助力开发者解决高并发场景下的消息处理难题。

一、消息投递的核心目标与挑战

消息投递的本质是实现跨服务的异步通信,但其背后隐藏着两大核心挑战:

  • 可靠性:确保消息不丢失、不重复,且业务操作与消息发送保持一致性(即 “业务成功则消息必达,业务失败则消息不发”)。
  • 高效性:在高并发场景下,消息投递不能成为系统瓶颈,需兼顾吞吐量与实时性。

针对这些挑战,业界普遍采用 “本地消息表 + 事务同步 + 重试机制” 的方案,通过 “先存后发” 的思路确保消息可靠投递。

二、可靠消息投递:基于本地消息表的事务消息方案

事务消息是解决 “业务操作与消息发送原子性” 的关键技术,其核心思想是将消息发送纳入本地事务管理,通过本地消息表记录消息状态,再通过异步投递与补偿机制确保最终一致性。

1. 事务消息的核心流程

事务消息的执行遵循 “本地事务优先,消息异步跟进” 的原则,具体流程如下:

  1. 开启本地事务:在业务方法中开启数据库事务(如用户注册、订单创建等场景)。
  2. 执行业务逻辑:完成核心业务操作(如插入用户记录、创建订单)。
  3. 写入消息表:将待发送的消息(含消息体、状态、创建时间等)写入本地消息表,状态标记为 “待投递”。
  4. 提交本地事务:若业务逻辑无异常,提交事务;若异常,则回滚(消息表记录也会被回滚,确保消息不被发送)。
  5. 异步投递消息:事务提交后,异步将消息表中 “待投递” 的消息发送至 MQ。
  6. 状态更新与重试:若投递成功,更新消息状态为 “成功”;若失败,记录失败原因并触发重试机制。

2. 核心组件设计与实现

(1)本地消息表设计

消息表是事务消息的核心载体,需记录消息的全生命周期状态,表结构示例如下:

CREATE TABLE `t_msg` (`id` varchar(32) NOT NULL COMMENT '消息唯一ID',`body_json` text NOT NULL COMMENT '消息体(JSON格式)',`topic` varchar(100) NOT NULL COMMENT 'MQ主题',`status` tinyint NOT NULL DEFAULT 0 COMMENT '状态:0-待投递,1-投递成功,2-投递失败',`fail_msg` text COMMENT '失败原因(status=2时记录)',`fail_count` int NOT NULL DEFAULT 0 COMMENT '失败次数',`next_retry_time` datetime DEFAULT NULL COMMENT '下次重试时间',`create_time` datetime NOT NULL COMMENT '创建时间',`update_time` datetime NOT NULL COMMENT '更新时间',PRIMARY KEY (`id`),KEY `idx_status_retry` (`status`,`next_retry_time`) COMMENT '用于查询待重试消息') ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';
(2)消息发送核心接口与实现

定义消息发送接口IMsgSender,封装消息发送与重试逻辑,业务方通过接口调用即可完成消息投递:

public interface IMsgSender {/*** 发送单条消息* @param msg 消息体(任意可序列化对象)* @param topic MQ主题*/void send(Object msg, String topic);/*** 批量发送消息* @param msgList 消息列表* @param topic MQ主题*/void sendBatch(List<Object> msgList, String topic);/*** 重试发送消息* @param msgId 消息ID*/void retrySend(String msgId);}

其实现类DefaultMsgSender是核心,负责消息表写入、事务同步与 MQ 投递:


@Servicepublic class DefaultMsgSender implements IMsgSender {@Autowiredprivate MsgMapper msgMapper;@Autowiredprivate MQTemplate mqTemplate; // 封装MQ客户端的发送工具@Autowiredprivate RetryStrategy retryStrategy; // 重试策略@Overridepublic void send(Object msg, String topic) {// 1. 生成消息记录MsgPO msgPO = buildMsgPO(msg, topic);// 2. 写入消息表(与业务事务同享一个事务)msgMapper.insert(msgPO);// 3. 注册事务同步器,事务提交后异步发送消息registerTransactionSync(msgPO);}private MsgPO buildMsgPO(Object msg, String topic) {MsgPO po = new MsgPO();po.setId(UUID.randomUUID().toString().replaceAll("-", ""));po.setBodyJson(JSON.toJSONString(msg));po.setTopic(topic);po.setStatus(0); // 待投递po.setCreateTime(new Date());po.setUpdateTime(new Date());return po;}private void registerTransactionSync(MsgPO msgPO) {if (TransactionSynchronizationManager.isSynchronizationActive()) {// 若存在活跃事务,注册同步器TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCompletion(int status) {if (status == TransactionSynchronization.STATUS_COMMITTED) {// 事务提交成功,异步发送消息CompletableFuture.runAsync(() -> sendToMQ(msgPO));}}});} else {// 无事务环境,直接发送sendToMQ(msgPO);}}// 实际发送消息到MQprivate void sendToMQ(MsgPO msgPO) {try {// 发送消息到MQmqTemplate.send(msgPO.getTopic(), msgPO.getBodyJson());// 发送成功,更新状态msgMapper.updateStatusSuccess(msgPO.getId(), new Date());} catch (Exception e) {// 发送失败,计算重试时间int newFailCount = msgPO.getFailCount() + 1;Date nextRetryTime = retryStrategy.calculateNextRetryTime(newFailCount);boolean needRetry = retryStrategy.needRetry(newFailCount);// 更新失败状态msgMapper.updateStatusFail(msgPO.getId(),e.getMessage(),newFailCount,needRetry ? nextRetryTime : null,new Date());}}@Overridepublic void sendBatch(List<Object> msgList, String topic) {// 批量处理逻辑,类似单条发送,省略...}@Overridepublic void retrySend(String msgId) {MsgPO msgPO = msgMapper.selectById(msgId);if (msgPO == null || msgPO.getStatus() != 2) {return;}sendToMQ(msgPO); // 复用发送逻辑}}
(3)重试策略与补偿定时任务

为避免消息因网络波动等临时问题丢失,需设计重试机制。采用衰减式重试策略(失败次数越多,重试间隔越长),示例如下:


public class DecayRetryStrategy implements RetryStrategy {private static final int MAX_RETRY_COUNT = 5; // 最大重试次数// 重试间隔(秒):第1次失败后10s,第2次30s,第3次60s,以此类推private static final int[] INTERVALS = {10, 30, 60, 120, 300};@Overridepublic Date calculateNextRetryTime(int failCount) {if (failCount >= MAX_RETRY_COUNT) {return null; // 超过最大次数,不再重试}int interval = INTERVALS[Math.min(failCount, INTERVALS.length - 1)];return new Date(System.currentTimeMillis() + interval * 1000L);}@Overridepublic boolean needRetry(int failCount) {return failCount < MAX_RETRY_COUNT;}}

同时,通过定时任务(Job)扫描待重试消息,触发重试:


@Componentpublic class MsgRetryJob {@Autowiredprivate MsgMapper msgMapper;@Autowiredprivate IMsgSender msgSender;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void retryFailedMsgs() {// 查询状态为失败且到达重试时间的消息List<MsgPO> needRetryMsgs = msgMapper.selectNeedRetryMsgs(new Date());for (MsgPO msg : needRetryMsgs) {msgSender.retrySend(msg.getId());}}}

3. 业务方使用示例

在业务方法中,只需注入IMsgSender并调用send方法,即可完成事务消息的发送:


@Servicepublic class UserService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate IMsgSender msgSender;@Transactional(rollbackFor = Exception.class)public void register(UserRegisterDTO dto) {// 1. 执行业务逻辑:创建用户UserPO user = new UserPO();user.setId(UUID.randomUUID().toString());user.setUsername(dto.getUsername());userMapper.insert(user);// 2. 发送用户注册消息(事务提交后自动发送)UserRegisterMsg msg = new UserRegisterMsg(user.getId(), user.getUsername());msgSender.send(msg, "user-register-topic");}}

三、延时消息:通用实现方案与场景落地

延时消息指消息发送后,并不立即投递到 MQ,而是延迟指定时间后再被消费,典型场景包括:订单 15 分钟未支付自动取消、超时任务提醒、失败操作重试等。

1. 延时消息的实现方案对比

常见的延时消息实现方案各有优劣,需根据业务场景选择:

方案

实现方式

优点

缺点

数据库定时轮询

消息表记录expect_send_time,定时任务扫描并发送

实现简单,不依赖特定 MQ

轮询间隔影响实时性,高频率轮询压力大

MQ 自带延时队列

如 RabbitMQ 的 TTL + 死信队列、RocketMQ 的延时等级

依赖 MQ 原生能力,性能好

受限于 MQ 支持的延时等级,灵活性差

内存延迟队列 + 持久化

结合 Java DelayQueue 与数据库,定时预加载消息

实时性高,支持任意延时

需处理服务重启后内存数据丢失问题

本文推荐 **“数据库 + DelayQueue + 定时预加载”** 方案,兼顾可靠性与灵活性。

2. 延时消息的核心实现

(1)消息表扩展

在原有消息表基础上增加延时相关字段:

ALTER TABLE `t_msg` ADD COLUMN `expect_send_time` datetime NOT NULL COMMENT '期望发送时间';

ALTER TABLE `t_msg` ADD COLUMN `is_delay` tinyint NOT NULL DEFAULT 0 COMMENT '是否延时消息:0-否,1-是';

(2)延时消息发送接口

扩展IMsgSender接口,支持发送延时消息:


public interface IMsgSender {// 发送延时消息(指定延迟时间)void sendDelay(Object msg, String topic, long delay, TimeUnit unit);// 发送延时消息(指定期望发送时间)void sendDelayAt(Object msg, String topic, Date expectSendTime);}
(3)基于 DelayQueue 的内存延迟队列

利用 Java 的DelayQueue(阻塞队列,支持按延时时间排序)实现内存级延时消息管理,结合定时任务预加载消息:

@Componentpublic class DelayMsgManager {private final DelayQueue<DelayMsgTask> delayQueue = new DelayQueue<>();@Autowiredprivate MsgMapper msgMapper;@Autowiredprivate IMsgSender msgSender;// 初始化时启动消费者线程@PostConstructpublic void startConsumer() {new Thread(() -> {while (true) {try {// 阻塞获取到期的消息任务DelayMsgTask task = delayQueue.take();// 发送消息msgSender.retrySend(task.getMsgId());} catch (Exception e) {// 异常处理}}}, "delay-msg-consumer").start();}// 定时预加载近期需要发送的延时消息@Scheduled(fixedRate = 60000) // 每分钟执行一次public void preloadDelayMsgs() {Date now = new Date();Date nextHour = new Date(now.getTime() + 3600 * 1000); // 加载未来1小时内的消息List<MsgPO> delayMsgs = msgMapper.selectDelayMsgs(now, nextHour);for (MsgPO msg : delayMsgs) {long delayMs = msg.getExpectSendTime().getTime() - now.getTime();if (delayMs > 0) {delayQueue.put(new DelayMsgTask(msg.getId(), delayMs));}}}// 延时任务封装static class DelayMsgTask implements Delayed {private final String msgId;private final long triggerTime; // 触发时间(毫秒)public DelayMsgTask(String msgId, long delayMs) {this.msgId = msgId;this.triggerTime = System.currentTimeMillis() + delayMs;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.triggerTime, ((DelayMsgTask) o).triggerTime);}// getterpublic String getMsgId() { return msgId; }}}
(4)延时消息发送实现

在DefaultMsgSender中实现延时消息发送逻辑:


@Overridepublic void sendDelay(Object msg, String topic, long delay, TimeUnit unit) {Date expectSendTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));sendDelayAt(msg, topic, expectSendTime);}@Overridepublic void sendDelayAt(Object msg, String topic, Date expectSendTime) {MsgPO msgPO = buildMsgPO(msg, topic);msgPO.setIsDelay(1);msgPO.setExpectSendTime(expectSendTime);msgMapper.insert(msgPO); // 写入消息表// 若期望时间在近期(如1小时内),直接加入内存延迟队列long now = System.currentTimeMillis();long delayMs = expectSendTime.getTime() - now;if (delayMs > 0 && delayMs <= 3600 * 1000) {delayMsgManager.getDelayQueue().put(new DelayMsgTask(msgPO.getId(), delayMs));}}

3. 延时消息的业务场景示例

以订单超时取消为例,演示延时消息的使用:


@Servicepublic class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate IMsgSender msgSender;@Transactional(rollbackFor = Exception.class)public String createOrder(OrderCreateDTO dto) {// 1. 创建订单OrderPO order = new OrderPO();order.setId(UUID.randomUUID().toString());order.setGoodsId(dto.getGoodsId());order.setStatus(0); // 未支付orderMapper.insert(order);// 2. 发送15分钟后执行的延时消息(订单超时取消)OrderTimeoutMsg msg = new OrderTimeoutMsg(order.getId());msgSender.sendDelay(msg, "order-timeout-topic", 15, TimeUnit.MINUTES);return order.getId();}}// 订单超时消息消费者@Componentpublic class OrderTimeoutConsumer {@Autowiredprivate OrderMapper orderMapper;@Consumer(topic = "order-timeout-topic")public void handle(OrderTimeoutMsg msg) {OrderPO order = orderMapper.selectById(msg.getOrderId());if (order != null && order.getStatus() == 0) { // 仍未支付// 取消订单逻辑(如更新状态、释放库存等)orderMapper.updateStatus(msg.getOrderId(), 2); // 2-已取消}}}

四、可靠性与性能优化建议

  1. 分布式锁防重复:在消息发送与重试时,通过分布式锁(如 Redis 锁)避免集群环境下的重复投递。
  2. 消息表归档:定期将已成功或超过最大重试次数的消息迁移至历史表,提升查询性能。
  3. 批量操作优化:消息写入与查询采用批量处理,减少数据库交互次数。
  4. 线程池隔离:消息发送与业务线程池隔离,避免相互影响。
  5. 监控告警:对消息发送成功率、重试次数、延时消息触发时效等指标进行监控,异常时及时告警。

五、总结

消息投递的可靠性与延时消息的精准性是分布式系统的重要基石。本文提出的 “本地消息表 + 事务同步” 方案确保了消息与业务的原子性,而 “数据库 + DelayQueue” 的组合则实现了通用、灵活的延时消息功能。

这些方案不依赖特定 MQ 中间件,可适配各种消息队列(如 RabbitMQ、Kafka、RocketMQ),且代码模块化程度高,易于集成到现有系统中。在实际应用中,需根据业务量级与实时性要求,灵活调整重试策略与延时消息的预加载频率,以达到可靠性与性能的最佳平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/919474.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/919474.shtml
英文地址,请注明出处:http://en.pswp.cn/news/919474.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 学习笔记(基础篇6)

面向对象基础1. 类和对象(1) 示例&#xff1a;public class Student {String name "张三";int age 23;public void study() {System.out.println("学习 Java");}public void eat() {System.out.println("吃饭");} }public class Test {public …

光学件加工厂倚光科技:陪跑光学未来力量

在光学创新的漫漫长路上&#xff0c;总有一些看似 “不划算” 的坚持&#xff0c;却在悄然改写行业的未来。倚光科技的故事&#xff0c;就始于这样一种选择 —— 明知光学打样利润微薄&#xff0c;明知上百个项目中能走到量产的寥寥无几&#xff0c;仍愿意投入全球顶尖的设备与…

RabbitMQ:生产者可靠性(生产者重连、生产者确认)

目录一、生产者重连二、生产者确认一、生产者重连 当网络不稳定的时候&#xff0c;利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试&#xff0c;也就是说多次重试过程中&#xff0c;当前线程是被阻塞的&#xff0c;会影响业务性能。 …

【深度学习新浪潮】空天地数据融合技术在城市三维重建中的应用

空天地数据融合技术在城市三维重建中的应用已取得显著进展,尤其在提升精度以满足具身智能机器人仿真训练需求方面,研究和产品均呈现多样化发展。以下是关键研究进展、产品方案及精度要求的详细分析: 一、研究进展与技术路径 1. 多源数据融合的技术突破 时空基准统一:通过…

Selenium自动化测试入门:cookie处理

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快driver.get_cookies() # 获得cookie 信息driver.get_cookies(name) # 获得对应name的cookie信息add_cookie(cookie_dict) # 向cookie 添加会话信息delete_cookie(na…

快解析如何让远程访问更安全?

一、勒索病毒攻击服务器的途径很多用户服务器对外开放&#xff0c;实现外网访问&#xff0c;担心服务器被勒索病毒攻击&#xff01;勒索病毒攻击服务器的途径之一是通过路由器开放的端口进行扫描攻击&#xff0c;所以尽量不要在服务器的路由器和防火墙中开放端口二、快解析如何…

Linux下编译ARPACK

本文记录Linux下编译ARPACK的流程。 零、环境 操作系统Ubuntu 22.04.4 LTSVS Code1.92.1Git2.34.1GCC11.4.0CMake3.22.1oneAPI2024.2.1 一、依赖 1.1 安装oneAPI 参见&#xff1a;Get the Intel oneAPI Base Toolkit , Get the Intel oneAPI HPC Toolkit 二、编译ARPACK …

芋道RBAC实现介绍

说明&#xff1a;之前写过一篇博客&#xff0c;介绍如何搭建一个基于角色的权限验证框架 搭建一个基于角色的权限验证框架 本文介绍在非常受欢迎的开源框架——芋道中是如何实现 RBAC 的&#xff0c;芋道的部署参考下面这篇文章&#xff1a; 芋道微服务代码部署 介绍 一般…

Docker部署Jellyfin,没有公网IP如何使用内网穿透远程访问?

Jellyfin是一款完全开源、免费的媒体服务器&#xff0c;可帮助你快速搭建属于自己的私人流媒体平台&#xff1a;电影、剧集、音乐、照片统统收纳&#xff0c;跨设备随点随播。本文将以最简洁的步骤&#xff0c;演示如何在Docker容器中部署Jellyfin&#xff0c;并通过贝锐花生壳…

Podman:Mysql(使用卷)

下载镜像hpphcomp:~$ podman pull docker.1ms.run/mysql:latest Trying to pull docker.1ms.run/mysql:latest... Getting image source signatures Copying blob c81e70a25040 done | Copying blob 31f7d8dc4024 done | Copying blob b9916866e45f done | Copying blob …

2025年渗透测试面试题总结-21(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 一、文件上传绕过方式&#xff08;Top 5&#xff09; 二、文件包含高危函数&#xff08;PHP为例&#xff0…

像海绵一样吸收技术书籍的高效学习方法

像海绵一样吸收技术书籍的高效学习方法前言六步高效阅读法步骤1&#xff1a;快速浏览章节步骤2&#xff1a;先读章末测验步骤3&#xff1a;只读粗体字步骤4&#xff1a;只读每段的首句和末句步骤5&#xff1a;通读整章步骤6&#xff1a;复习与重复高效学习技术书籍的实用技巧1.…

Day60--图论--94. 城市间货物运输 I(卡码网),95. 城市间货物运输 II(卡码网),96. 城市间货物运输 III(卡码网)

Day60–图论–94. 城市间货物运输 I&#xff08;卡码网&#xff09;&#xff0c;95. 城市间货物运输 II&#xff08;卡码网&#xff09;&#xff0c;96. 城市间货物运输 III&#xff08;卡码网&#xff09; 今天是Bellman_ford专场。带你从普通的Bellman_ford&#xff0c;到队列…

Jenkins服务器SSH公钥配置步骤

步骤1. 在Jenkins服务器上生成SSH密钥在Jenkins服务器上执行以下命令&#xff1a;# 1. 生成SSH密钥对 ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa -N ""# 2. 设置正确的权限 chmod 700 ~/.ssh chmod 600 ~/.ssh/id_rsa chmod 644 ~/.ssh/id_rsa.pub# 3. 查看公钥内…

数据链路层-网络层-传输层

文章目录深入浅出理解网络核心&#xff1a;从交换机到TCP/UDP一、数据链路层&#xff1a;交换机的"地盘"1. 数据链路层的核心功能2. 以太网的发展历程3. 以太网中的MAC地址4. 以太网帧格式&#xff1a;数据的"快递包装"5. 交换机的工作原理&#xff1a;高效…

专题:2025跨境电商市场布局、供应链与产业带赋能报告 |附130+份报告PDF、原数据表汇总下载

原文链接&#xff1a;https://tecdat.cn/?p43616 2025年&#xff0c;跨境圈的老板们集体焦虑&#xff1a;美国关税飙到145%&#xff0c;亚马逊封号潮卷土重来&#xff0c;而东南亚却悄悄涨了246%&#xff01;这不是危言耸听——66%的美国消费者说&#xff0c;海外货涨10%就换本…

LINUX 818 shell:random;for for

问题 [rootweb ~]# a$(echo $[$RANDOM%10]) 您在 /var/spool/mail/root 中有邮件 [rootweb ~]# echo $a 3 [rootweb ~]# echo 139$a$a$a$a$a$a$a$a 13933333333 您在 /var/spool/mail/root 中有邮件 [rootweb ~]# echo 139 $a 139 3 [rootweb ~]# echo $a 3 [rootweb ~]# echo …

JavaScript 原型机制详解:从概念到实战(附个人学习方法)

原型是 JavaScript 实现继承与代码复用的核心机制,也是面试高频考点。本文结合个人学习经验、核心概念解析与实战案例,帮你彻底搞懂原型、prototype、__proto__ 及相关知识点,同时分享高效的学习方法。 一、个人学习方法:高效掌握复杂知识点 复杂概念(如原型)的学习,关…

【人工智能】2025年AI代理失控危机:构建安全壁垒,守护智能未来

还在为高昂的AI开发成本发愁?这本书教你如何在个人电脑上引爆DeepSeek的澎湃算力! 在2025年,AI代理(AI Agents)已成为日常生活和企业运营的核心组成部分,它们能够自主决策、执行任务并与环境互动。然而,随着AI代理能力的指数级提升,其安全隐患也日益凸显,包括数据泄露…

从噪声到动作:Diffusion Policy 如何改变机器人学习?

从噪声到动作&#xff1a;Diffusion Policy 如何改变机器人学习&#xff1f; 引言 在机器人手臂操作方面一直存在诸多挑战。我们熟悉的工业场景中的组装机械臂&#xff0c;往往依赖于写死的程序指令进行控制&#xff0c;具有高度规范化与高精度的特点。而当机械臂需要在复杂、…