Spring Boot整合RabbitMQ实现消息可靠投递全解析

在分布式系统中,消息中间件是解耦、异步、流量削峰的核心组件。RabbitMQ作为高可靠、易扩展的AMQP协议实现,被广泛应用于企业级场景。但消息传递过程中可能因网络波动、服务宕机等问题导致消息丢失,因此消息的可靠投递是RabbitMQ使用的核心课题。本文将基于Spring Boot 3.x版本,详细讲解生产者(Producer)和消费者(Consumer)两端的可靠投递实现方案。


一、环境准备与基础配置

1.1 依赖引入

pom.xml中添加Spring Boot RabbitMQ Starter依赖,自动整合AmqpTemplate和RabbitTemplate:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 连接配置

application.yml中配置RabbitMQ连接信息及关键可靠投递参数:

spring:rabbitmq:host: 127.0.0.1       # RabbitMQ服务地址port: 5672            # 默认AMQP端口username: guest       # 默认用户名(生产环境需替换)password: guest       # 默认密码(生产环境需替换)virtual-host: /       # 默认虚拟主机# 生产者确认与回退配置publisher-confirm-type: correlated  # 关键:开启消息确认模式publisher-returns: true             # 开启消息回退模式# 消费者确认配置listener:simple:acknowledge-mode: manual        # 手动确认(默认auto自动确认)prefetch: 10                    # 消费者单次拉取最大消息数(防雪崩)

1.3 核心组件初始化

通过配置类初始化RabbitMQ连接工厂、消息模板及队列/交换器声明:

@Configuration
public class RabbitMQConfig {// 声明测试用交换器和队列(根据业务场景调整)public static final String TEST_EXCHANGE = "test.exchange";public static final String TEST_QUEUE = "test.queue";public static final String TEST_ROUTING_KEY = "test.key";@Beanpublic DirectExchange testExchange() {// 声明直连交换器(持久化)return new DirectExchange(TEST_EXCHANGE, true, false);}@Beanpublic Queue testQueue() {// 声明持久化队列(durable=true)return new Queue(TEST_QUEUE, true, false, false);}@Beanpublic Binding testBinding() {// 绑定队列与交换器return BindingBuilder.bind(testQueue()).to(testExchange()).with(TEST_ROUTING_KEY);}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 必须设置为true,否则ReturnCallback不会触发(仅当消息无法路由到队列时回调)template.setMandatory(true);return template;}
}

二、生产者可靠投递:确认模式与回退模式

生产者的可靠投递需解决两个核心问题:

  1. 消息是否成功到达交换器(Exchange)?
  2. 消息从交换器到队列(Queue)是否失败?

Spring Boot通过ConfirmCallback(确认模式)和ReturnCallback(回退模式)分别解决这两个问题。

2.1 确认模式(ConfirmCallback):消息到交换器的确认

作用:当消息被交换器接收时触发回调(无论是否路由到队列),用于确认消息已到达交换器。

2.1.1 配置与实现

通过RabbitTemplatesetConfirmCallback方法注册回调:

@Service
public class ProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 注册确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到达交换器log.info("消息确认成功,ID:{}", correlationData.getId());} else {// 消息未到达交换器(如交换器不存在、权限不足)log.error("消息确认失败,ID:{},原因:{}", correlationData.getId(), cause);// 这里可触发重试逻辑(需结合correlationData存储原始消息)}});}public void sendMessage(String message) {// 构造CorrelationData(用于关联消息ID,需全局唯一)CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_EXCHANGE,RabbitMQConfig.TEST_ROUTING_KEY,message,correlationData);}
}
2.1.2 参数与注意事项
  • publisher-confirm-type
    • none(默认):禁用确认模式;
    • correlated:启用关联确认(推荐),通过CorrelationData传递消息元数据;
    • simple:简化模式(兼容老版本),仅支持同步确认。
  • CorrelationData:必须显式传递,否则回调中无法获取消息ID等元数据;
  • 异步特性:确认回调是异步触发的,生产环境需结合本地消息表或Redis记录消息状态,避免丢失。

2.2 回退模式(ReturnCallback):交换器到队列的失败处理

作用:当消息成功到达交换器,但无法路由到任何队列时触发回调(如路由键错误、队列未绑定)。

2.2.1 配置与实现

通过RabbitTemplatesetReturnCallback方法注册回退回调(Spring Boot 2.1+推荐使用setReturnsCallback):

@PostConstruct
public void init() {// 回退回调(Spring Boot 2.1+推荐使用ReturnsCallback)rabbitTemplate.setReturnsCallback(returned -> {Message message = returned.getMessage();String exchange = returned.getExchange();String routingKey = returned.getRoutingKey();int replyCode = returned.getReplyCode();String replyText = returned.getReplyText();log.error("消息回退,交换器:{},路由键:{},错误码:{},原因:{},消息内容:{}",exchange, routingKey, replyCode, replyText, new String(message.getBody()));// 这里可触发补偿逻辑(如修改路由键重发)});
}
2.2.2 参数与注意事项
  • mandatory:必须设置为true(通过rabbitTemplate.setMandatory(true)),否则RabbitMQ会静默丢弃无法路由的消息;
  • 触发条件:仅当消息无法路由到任何队列时触发(若交换器绑定了多个队列,只要有一个队列匹配就不会触发);
  • 与确认模式的关系:确认模式(ConfirmCallback)先于回退模式触发,因为交换器接收消息后才会尝试路由。

三、消费者可靠投递:自动确认与手动确认

消费者的可靠投递核心是消息确认(ACK)机制,确保消息被成功处理后再确认,避免因处理失败导致消息丢失。

3.1 自动确认(AUTO):简单但高风险

原理:消息一旦被消费者接收,RabbitMQ立即标记为已确认并删除。若消费者处理失败(如抛出异常),消息已丢失。

3.1.1 配置与实现

application.yml中设置acknowledge-mode: auto(默认值),消费者无需手动处理ACK:

@Component
public class AutoAckConsumer {@RabbitListener(queues = RabbitMQConfig.TEST_QUEUE)public void handleMessage(String message) {try {// 模拟业务处理log.info("自动确认模式-消费消息:{}", message);// 若处理成功,RabbitMQ自动ACK} catch (Exception e) {log.error("消息处理失败:{}", message, e);// 无补救措施,消息已丢失!}}
}
3.1.2 适用场景与风险
  • 适用场景:消息处理逻辑简单、无失败可能(如日志记录);
  • 风险:消息处理失败时无法重试,可能导致数据丢失;
  • 生产环境不推荐,除非能接受消息丢失。

3.2 手动确认(MANUAL):精准控制,生产首选

原理:消费者显式调用channel.basicAck(确认)或channel.basicNack(拒绝),RabbitMQ根据ACK状态决定是否重新入队。

3.2.1 配置与实现
  1. application.yml中设置acknowledge-mode: manual
  2. 消费者方法中注入ChannelMessage对象,手动处理ACK:
@Component
public class ManualAckConsumer {@RabbitListener(queues = RabbitMQConfig.TEST_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);log.info("手动确认模式-消费消息:{}", msgContent);// 模拟业务处理(可能失败)businessProcess(msgContent);// 处理成功:确认消息(multiple=false表示仅确认当前消息)channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("消息处理失败,准备重试或丢弃:{}", message, e);// 处理失败:拒绝消息(requeue=true表示重新入队,false表示丢弃或进入死信队列)channel.basicNack(deliveryTag, false, true); // 或使用basicReject(仅拒绝单条消息):// channel.basicReject(deliveryTag, true);}}private void businessProcess(String message) {// 模拟可能失败的业务逻辑if (message.contains("error")) {throw new RuntimeException("模拟业务处理失败");}}
}
3.2.2 关键方法与参数解释
  • channel.basicAck(deliveryTag, multiple)
    • deliveryTag:消息的唯一标识(由RabbitMQ生成);
    • multiple:是否批量确认(true表示确认所有小于deliveryTag的未确认消息);
  • channel.basicNack(deliveryTag, multiple, requeue)
    • requeuetrue表示消息重新入队(可能被同一消费者重复消费),false表示丢弃或进入死信队列;
  • channel.basicReject(deliveryTag, requeue):与basicNack类似,但仅支持单条消息拒绝。
3.2.3 生产环境注意事项
  • 幂等性处理:消息可能因requeue=true被重复消费,业务逻辑需保证幂等(如通过数据库唯一索引、Redis分布式锁);
  • 异常捕获范围:必须在try-catch中包裹完整的业务逻辑,避免未捕获异常导致ACK未发送,消息被无限阻塞;
  • 批量确认优化:若处理大量消息,可结合multiple=true批量确认提升性能(需确保批量消息均处理成功);
  • 死信队列(DLX):建议将requeue=false的消息路由到死信队列,避免无限重试消耗资源(需提前声明死信交换器和队列)。

四、总结与最佳实践

4.1 生产者侧关键要点

  • 启用correlated确认模式,结合CorrelationData记录消息ID;
  • 启用回退模式(mandatory=true),捕获无法路由的消息;
  • 确认回调中实现消息重试(需避免无限重试,可结合指数退避策略);
  • 消息持久化:设置交换器、队列、消息本身为持久化(durable=true),防止RabbitMQ重启导致消息丢失。

4.2 消费者侧关键要点

  • 优先选择手动确认模式(manual),精确控制消息状态;
  • 处理逻辑必须保证幂等性,避免重复消费问题;
  • 合理设置prefetch参数(如prefetch=10),防止消费者负载过高;
  • 失败消息路由到死信队列,避免阻塞正常消息处理。

4.3 完整可靠投递链路

通过“生产者确认+回退模式+消费者手动确认+消息持久化+死信队列”的组合,可构建覆盖全链路的可靠消息传递体系,满足绝大多数企业级场景的需求。

后续我将会对死信队列进行详细讲解,欢迎关注。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/94071.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/94071.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/94071.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STAR-CCM+|K-epsilon湍流模型溯源

【1】引言 三维CFD仿真经典软件很多&#xff0c;我接触过的有Ansys和STAR-CCM两种。因为一些机缘&#xff0c;我使用STAR-CCM更多&#xff0c;今天就来回顾一下STAR-CCM中K-epsilon湍流模型的基本定义。 【2】学习地址介绍 点击链接User Guide可以到达网页版本的STAR-CCM 24…

osgEarth 图像融合正片叠底

* 需求&#xff1a;* 高程渲染图 RGB.tif、 山体阴影图 AMP.tif** 高程渲染图 rgb波段分别 乘以 山体阴影图r波段&#xff0c; 然后除以255(AI说 读取的纹理就已经归一化到了 0~1 范围&#xff0c;不用除以 255)。本人遥感知识匮乏。问了AI,以上 需求在许多商业软件上已实现。…

Java接口响应速度优化

在 Java 开发中&#xff0c;接口响应速度直接影响用户体验和系统吞吐量。优化接口性能需要从代码、数据库、缓存、架构等多个维度综合考量&#xff0c;以下是具体方案及详细解析&#xff1a;一、代码层面优化代码是接口性能的基础&#xff0c;低效的代码会直接导致响应缓慢。1.…

A Large Scale Synthetic Graph Dataset Generation Framework的学习笔记

文章的简介 作者提出了一个可扩展的合成图生成框架&#xff0c;能够从真实图中学习结构和特征分布&#xff0c;并生成任意规模的图数据集&#xff0c;支持&#xff1a; 节点和边的结构生成节点和边的特征生成特征与结构的对齐&#xff08;Aligner&#xff09; 它区别于GraphWor…

Android12 Framework读写prop属性selinux报错解决

文章目录问题描述解决过程相关文章问题描述 Android读prop值时&#xff0c;就算是system应用&#xff0c; 也需要selinux权限&#xff0c;否则会报错。 java代码如下 SystemProperties.get("ro.input.resampling", "")selinux报错如下 2025-06-28 17:57:…

【图文版】AIOT 小智 AI 聊天机器人 ESP32 项目源码图解

前言 小智 AI 聊天机器人是最近一个很火的开源项目&#xff0c;它借助LLM大模型以及TTS等AI的能力&#xff0c;通过自然语言来与其对话实现交互。它可以回答任何问题、播放音乐、背诵古诗&#xff0c;颇有未来AI机器人的雏形。 因为最近工作上的需要对其进行了研究&#xff0c;…

250821-RHEL9.4上Docker及Docker-Compose的离线安装

在 离线环境下 在 RHEL (Red Hat Enterprise Linux) 系统上安装 Docker 和 Docker Compose&#xff0c;需要提前在有网络的环境中下载相关 RPM 包及依赖&#xff0c;然后在目标机器上进行安装。以下是比较完整的步骤&#xff1a; 1. Docker及Docker-Compose离线安装 在 RHEL 9.…

react相关知识

1.类组件和函数组件&#xff08;1&#xff09;类组件import React, { Component } from react;class UserProfile extends Component {constructor(props) {super(props);this.state {userData: null,isLoading: true,};this.timerId null;}componentDidMount() {// 模拟 API…

算法第五十五天:图论part05(第十一章)

并查集理论基础并查集主要有两个功能&#xff1a;将两个元素添加到一个集合中。判断两个元素在不在同一个集合class UnionFind:def __init__(self, n):"""初始化并查集"""self.n nself.father list(range(n)) # 每个节点自己是根self.rank […

雨雾天气漏检率骤降80%!陌讯多模态车牌识别方案实战解析

一、行业痛点&#xff1a;车牌识别的天气敏感性据《智慧交通系统检测白皮书》统计&#xff0c;雨雾环境下传统车牌识别漏检率高达42.7%&#xff08;2024年数据&#xff09;。主要存在三大技术瓶颈&#xff1a;1.​​水膜干扰​​&#xff1a;挡风玻璃水渍导致车牌区域纹理模糊2…

PostgreSQL15——查询详解

PostgreSQL15查询详解一、简单查询1.1、单表查询1.2、无表查询1.3、消除重复结果1.4、使用注释二、查询条件2.1、WHERE子句2.2、模式匹配2.3、空值判断2.4、复杂条件三、排序显示3.1、单列排序3.2、多列排序3.3、空值排序四、限定结果数量4.1、Top-N查询4.2、分页查询4.3、注意…

03-容器数据卷

卷就是目录或文件&#xff0c;存在于一个或多个容器中&#xff0c;由 docker 挂载到容器&#xff0c;但不属于联合文件系统&#xff0c;因此能够绕过 UnionFS&#xff0c;提供一些用于持续存储或共享数据。 特性&#xff1a;卷设计的目的就是数据的持久化&#xff0c;完全独立于…

Linux内核进程管理子系统有什么第三十三回 —— 进程主结构详解(29)

接前一篇文章&#xff1a;Linux内核进程管理子系统有什么第三十二回 —— 进程主结构详解&#xff08;28&#xff09; 本文内容参考&#xff1a; Linux内核进程管理专题报告_linux rseq-CSDN博客 《趣谈Linux操作系统 核心原理篇&#xff1a;第三部分 进程管理》—— 刘超 《…

从代码学习深度强化学习 - 目标导向的强化学习-HER算法 PyTorch版

文章目录 1. 前言:当一个任务有多个目标 2. 目标导向的强化学习 (GoRL) 简介 3. HER算法:化失败为成功的智慧 4. 代码实践:用PyTorch实现HER+DDPG 4.1 自定义环境 (WorldEnv) 4.2 智能体与算法 (DDPG) 4.3 HER的核心:轨迹经验回放 4.4 主流程与训练 5. 训练结果与分析 6. 总…

前端 H5分片上传 vue实现大文件

用uniapp开发APP上传视频文件&#xff0c;大文件可以上传成功&#xff0c;但是一旦打包为H5的代码&#xff0c;就会一提示链接超时&#xff0c;我的代码中是实现的上传到阿里云 如果需要看全文的私信我 官方开发文档地址 前端&#xff1a;使用分片上传的方式上传大文件_对象…

Linux服务器Systemctl命令详细使用指南

目录 1. 基本语法 2. 基础命令速查表 3. 常用示例 3.1 部署新服务后&#xff0c;设置开机自启并启动 3.2 检查系统中所有失败的服务并尝试修复 3.3 查看系统中所有开机自启的服务 4. 总结 以下是 systemctl 使用指南&#xff0c;涵盖服务管理、单元操作、运行级别控制、…

【JVM内存结构系列】二、线程私有区域详解:程序计数器、虚拟机栈、本地方法栈——搞懂栈溢出与线程隔离

上一篇文章我们搭建了JVM内存结构的整体框架,知道程序计数器、虚拟机栈、本地方法栈属于“线程私有区域”——每个线程启动时会单独分配内存,线程结束后内存直接释放,无需GC参与。这三个区域看似“小众”,却是理解线程执行逻辑、排查栈溢出异常的关键,也是面试中高频被问的…

红帽认证升级华为openEuler证书活动!

如果您有红帽证书&#xff0c;可以升级以下相应的证书&#xff1a;&#x1f447; 有RHCSA证书&#xff0c;可以99元升级openEuler HCIA 有RHCE证书&#xff0c;可以99元升级openEuler HCIP 有RHCA证书&#xff0c;可以2100元升级openEuler HCIE 现金激励&#xff1a;&#x1f4…

迭代器模式与几个经典的C++实现

迭代器模式详解1. 定义与意图迭代器模式&#xff08;Iterator Pattern&#xff09; 是一种行为设计模式&#xff0c;它提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不暴露该对象的内部表示。主要意图&#xff1a;为不同的聚合结构提供统一的遍历接口。将遍历…

epoll 陷阱:隧道中的高级负担

上周提到了 tun/tap 转发框架的数据通道结构和优化 tun/tap 转发性能优化&#xff0c;涉及 RingBuffer&#xff0c;packetization 等核心话题。我也给出了一定的数据结构以及处理逻辑&#xff0c;但竟然没有高尚的 epoll&#xff0c;本文说说它&#xff0c;因为它不适合。 epo…