核心概念解析

发布者确认机制的核心思想是:将消息投递的可靠性从“尽力而为”提升为“契约保证”。生产者不再是“发后不理”,而是与 Broker 建立一个双向的沟通渠道。

在 Spring AMQP 的封装下,这个机制主要由两个回调接口实现:

1. ConfirmCallback: 确认消息是否到达 Exchange

这是最核心的确认机制。它关注的是消息从生产者到 Broker 内的交换机 (Exchange) 这一段路程是否成功。

  • 触发时机:无论消息是否成功到达 Exchange,Broker 都会异步地调用生产者的这个回调函数。
  • 如何工作
    • 如果 Broker 成功接收消息并将其放入 Exchange,回调中的 ack 参数将为 true
    • 如果 Broker 因故(如内部错误、交换机不存在等)未能接收消息,ack 参数将为 false,同时 cause 参数会提供失败的原因描述。
  • 作用:它回答了问题:“Broker 收到我的消息了吗?

2. ReturnCallback: 确认消息是否路由到 Queue

这是一个补充机制,处理的是一个更细分的场景。它在 ConfirmCallback 返回成功 (ack=true) 的前提下才可能被触发。

  • 触发时机:当消息已成功到达 Exchange,但 Exchange 无法根据路由键 (Routing Key) 将消息路由到任何一个绑定的队列时,Broker 会将这条“无法投递”的消息退回给生产者,并调用此回调。
  • 如何工作
    • 如果消息被正常路由到一个或多个队列,ReturnCallback 不会被触发
    • 如果消息无法路由(例如,路由键写错,或者没有队列绑定这个路由键),回调函数将被调用,你可以从 ReturnedMessage 参数中获取到被退回的消息内容、路由信息和退回原因。
  • 作用:它回答了问题:“我发给 Exchange 的消息,有队列接收它吗?
  • 回调参数:回调函数中有⼀个参数: ReturnedMessage 包含以下属性
public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;
}

工作流程图

下图清晰地展示了这两种回调机制在消息发送过程中的作用点:
在这里插入图片描述

demo演练

接下来,我们通过一个 Spring Boot 项目来演示如何实现发布者确认。

项目结构

一个简单的 Spring Boot 项目结构如下:

此处的com.example.ackdemo要改成读者自己的项目路径,包括后面相关代码的路径引入也需要进行对应修改

ack-demo
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── publishackdemo
│   │   │               │   ├── RabbitMQConfig.java
│   │   │               │   └── RabbitTemplateConfig.java
│   │   │               │   └── MessageController.java
│   │   │               └── AckDemoApplication.java
│   │   └── resources
│   │       └── application.yml
└── pom.xml

配置发布者确认模式

src/main/resources/application.yml 文件中,进行如下配置:

spring:application:name: ackDemorabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 开启ConfirmCallback,correlated表示回调时会携带CorrelationDatapublisher-returns: true # 开启ReturnCallback

声明 Exchange 和 Queue

config/RabbitMQConfig.java 中声明我们需要的交换机和队列。

注意此处引入的包为org.springframework.amqp.core

package com.example.publishackdemo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration("RabbitMQConfigWithProductACK")
public class RabbitMQConfig {// 此处的常量提取到一个单独的静态类中更好,此处为了方便演式不单独提取public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key.confirm";@Beanpublic TopicExchange confirmExchange() {return ExchangeBuilder.topicExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding confirmBinding(Queue confirmQueue, TopicExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}

配置 RabbitTemplate 回调

这是核心步骤。我们创建一个配置类,专门用于定制 RabbitTemplate,并为其设置回调。

package com.example.publishackdemo.config;import lombok.extern.slf4j.Slf4j;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Slf4j  
@Configuration("RabbitMQConfigWithPublisherAck")  
public class RabbitTemplateConfig {  @Bean("rabbitTemplate")  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {  return new RabbitTemplate(connectionFactory);  }  @Bean("confirmRabbitTemplate") // 给这个新的Bean起一个唯一的名字  public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  // 关键:只为这个新的实例设置回调  rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {  String id = (correlationData != null) ? correlationData.getId() : "";  if (ack) {  log.info("ConfirmCallback: 消息发送成功!ID: {}", id);  } else {  log.error("ConfirmCallback: 消息发送失败!ID: {}, 原因: {}", id, cause);  }  });  // 同样可以设置 ReturnsCallback//rabbitTemplate.setMandatory(true);   rabbitTemplate.setReturnsCallback(returnedMessage -> {  log.warn("ReturnsCallback: 消息被退回! Message: {}, ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}",  new String(returnedMessage.getMessage().getBody()),  returnedMessage.getReplyCode(),  returnedMessage.getReplyText(),  returnedMessage.getExchange(),  returnedMessage.getRoutingKey());  // 此处可以记录无法路由的消息,用于后续分析或处理  });  return rabbitTemplate;  }  
}

此处配置两个RabbitTemplate的原因?

  • setConfirmCallback只能被设置一次,如果直接在Controller层里面调用的时候声明,那么每次请求都会调用一次设置的代码,会导致第二次之后的请求都会报错,所以需要提取到Config里面
  • 在设置RabbitTemplatesetReturnsCallback或者setConfirmCallback设置之后会全局生效,如果并不需要进行发送确认的生产者也使用了这个Template那么会导致性能下降等问题,所以创建两个不同的Bean就是为了在不同情况选择不同的Bean对象

生产者代码 (Publisher)

修改 MessageController,增加几个测试接口来模拟不同场景。

package com.example.ackdemo.controller;import com.example.ackdemo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@Slf4j
@RestController
public class MessageController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate rabbitTemplate;// 1. 测试正常发送@GetMapping("/send/ok")public String sendOkMessage() {String id = UUID.randomUUID().toString();String message = "A correct message.";log.info("Sending message with ID: {}", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (OK). Check logs for callback.";}// 2. 测试发送到不存在的 Exchange@GetMapping("/send/bad-exchange")public String sendToBadExchange() {String id = UUID.randomUUID().toString();String message = "Message to a non-existent exchange.";log.info("Sending message with ID: {} to a bad exchange", id);rabbitTemplate.convertAndSend("non-existent-exchange", RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (Bad Exchange). Check logs for callback.";}// 3. 测试发送到正确的 Exchange,但错误的 Routing Key@GetMapping("/send/bad-routing")public String sendWithBadRoutingKey() {String id = UUID.randomUUID().toString();String message = "Message with a bad routing key.";log.info("Sending message with ID: {} with a bad routing key", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, "wrong.key.123", message, new CorrelationData(id));return "Message sent (Bad Routing). Check logs for callback.";}
}

运行与验证

  1. 启动应用。
  2. 验证成功场景
    • 访问 http://localhost:8080/send/ok
    • 日志打印:你会看到两条日志,说明回调已经正确设置。
2025-07-11T19:59:56.682+08:00  INFO 11868 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController              : Sending message with ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
2025-07-11T19:59:56.980+08:00  INFO 11868 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息发送成功!ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
  • 观察 RabbitMQ 管理界面confirm.queue 中会有一条消息。
  1. 验证交换机失败场景
    • 访问 http://localhost:8080/send/bad-exchange
    • 日志打印:只会触发 ConfirmCallback 的失败回调。
reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
2025-07-11T20:02:41.784+08:00 ERROR 40760 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息发送失败!ID: bf93981e-f0a7-485d-bddf-cd5aec3e299f, 原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
  1. 验证路由失败场景
    • 访问 http://localhost:8080/send/bad-routing
    • 日志打印:你会看到先触发成功ConfirmCallback(因为消息确实到达了 Exchange)
    • 但是并没有触发ReturnsCallback,为什么?
      • 因为需要在其方法前添加一行rabbitTemplate.setMandatory(true);来开启此功能
2025-07-11T20:13:00.788+08:00  INFO 10852 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController              : Sending message with ID: 24753c11-7341-4836-8c74-79a0deae8f3b with a bad routing key
2025-07-11T20:13:01.023+08:00  WARN 10852 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig             : ReturnsCallback: 消息被退回! Message: Message with a bad routing key., ReplyCode: 312, ReplyText: NO_ROUTE, Exchange: confirm.exchange, RoutingKey: wrong.key.123
2025-07-11T20:13:01.023+08:00  INFO 10852 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig             : ConfirmCallback: 消息发送成功!ID: 24753c11-7341-4836-8c74-79a0deae8f3b

生产环境注意事项

  1. 为失败做好准备:收到 nackreturn 回调后,必须有相应的补偿机制。常见的策略包括:

    • 有限重试:对于网络抖动等临时性故障,可以进行几次延时重试。
    • 记录日志与告警:对于持续失败或逻辑错误(如错误的Exchange/RoutingKey),应详细记录日志,并触发告警通知开发人员介入。
    • 消息入库:将发送失败的消息存入数据库或本地文件,通过定时任务进行重发,这是最可靠的补偿方式。
  2. 善用 CorrelationData:在异步高并发场景下,CorrelationData 是你识别哪条消息得到确认的唯一凭证。它的 ID 应该具有业务唯一性(如订单ID、业务流水号),以便于追踪和排错。

  3. 性能权衡:开启发布者确认会增加网络开销和 Broker 的 CPU 负担,从而降低消息发送的吞吐量。对于可以容忍少量丢失的非核心业务(如打点日志),可以关闭此功能以追求性能。

  4. 全局回调 vs. 单次发送回调:我们演示的是全局配置 RabbitTemplate 的回调。RabbitTemplate 也支持为单次 send 操作指定一个临时的 CorrelationData,它内部可以包含更丰富的回调逻辑,适用于需要对特定消息进行特殊处理的场景。

  5. 构建完整的可靠性链路:切记,发布者确认只是可靠性拼图的一部分。一个完整的可靠性方案必须是:发布者确认 + 持久化(交换机、队列、消息)+ 消费者确认。三者结合,才能最大限度地保证消息在整个生命周期内的安全。

总结

RabbitMQ 的发布者确认机制,通过 ConfirmCallbackReturnCallback 两个强大的工具,为我们弥补了消息从生产者到 Broker 这一段路程中的可靠性盲区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91175.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91175.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/91175.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

KONG API Gateway中的核心概念

在使用Kong API Gateway&#xff08;API网关&#xff09;时&#xff0c;理解其核心概念是掌握其工作原理的基础。这些概念既体现了Kong的设计哲学&#xff0c;也决定了它如何适配复杂的API管理场景&#xff08;如微服务、多团队协作等&#xff09;。本文将系统梳理Kong的核心概…

如何解决pip安装报错ModuleNotFoundError: No module named ‘jupyterlab’问题

【Python系列Bug修复PyCharm控制台pip install报错】如何解决pip安装报错ModuleNotFoundError: No module named ‘jupyterlab’问题 摘要 在开发过程中&#xff0c;我们经常会遇到各种模块安装的问题&#xff0c;尤其是在使用PyCharm时&#xff0c;经常会遇到pip install时的…

3 运算符与表达式

运算符&#xff1a;对字面量或者变量进行操作的符号 表达式&#xff1a;用运算符把字面量或者变量连接起来符合java语法的式子就可以称作表达式不同运算符连接的表达式体现的是不同类型的表达式int a 10; int b 20; int c a b;&#xff1a;运算符&#xff0c;并且是算术运算…

MySQL的单行函数:

目录 函数的理解&#xff1a; MySQL的内置函数及分类&#xff1a; 单行函数&#xff1a; 数值函数&#xff1a; 基本函数&#xff1a; 角度与弧度互换函数&#xff1a; 三角函数&#xff1a; 指数与对数&#xff1a; 进制转换&#xff1a; 字符串函数&#xff1a; 日…

设计模式(二十一)行为型:状态模式详解

设计模式&#xff08;二十一&#xff09;行为型&#xff1a;状态模式详解状态模式&#xff08;State Pattern&#xff09;是 GoF 23 种设计模式中的行为型模式之一&#xff0c;其核心价值在于允许一个对象在其内部状态改变时改变其行为&#xff0c;使得对象看起来像是修改了它的…

深入理解 Doris Compaction:提升查询性能的幕后功臣

在 Doris 的数据存储与查询体系里&#xff0c;Compaction 是保障查询效率、优化存储结构的关键机制。如果你好奇 Doris 如何在高频写入后仍能高效响应查询&#xff0c;或是想解决数据版本膨胀带来的性能问题&#xff0c;这篇关于 Compaction 的深度解析值得收藏 &#x1f447; …

css 实现虚线效果的多种方式

使用边框实现虚线 通过设置元素的边框样式来实现虚线效果。以下为示例代码: .dashed {border: 1px dashed black; }使用 CSS 伪元素实现虚线 使用伪元素来模拟虚线的效果。以下为示例代码: .dashed::before {content: "";display: block;height: 1px;border-bo…

深入剖析 RocketMQ 分布式事务:原理、流程与实践

Apache RocketMQ 是一种分布式消息队列系统&#xff0c;支持分布式事务消息&#xff0c;以确保在分布式系统中数据的一致性。它通过一种基于两阶段提交(2PC)的机制结合补偿逻辑来实现分布式事务的最终一致性。以下是对 RocketMQ 分布式事务的详细讲解&#xff0c;包括其核心概念…

具身智能 自动驾驶相关岗位的技术栈与能力地图

一、硬技能技术栈&#xff08;优先级排序&#xff09; 1. 核心领域技术&#xff08;★★★★★&#xff09;技术方向具体技能学习建议大模型实战- VLA架构&#xff08;RT-2、PaLM-E&#xff09;开发/微调- 多模态对齐&#xff08;CLIP、Flamingo&#xff09;- 生成式策略&#…

实现了加载 正向 碰撞 雅可比 仿真

""" # 此示例从 URDF 文件中加载一个 UR10 机械臂的模型 # 随后演示 Pinocchio 库的基本功能,如正向运动学计算 # 雅可比矩阵计算、碰撞检测以及动力学仿真 """ # 导入 meshcat 的几何模块,用于创建和管理可视化的几何对象 import meshcat.geo…

【0基础PS】PS工具详解--画笔工具

目录前言一、画笔工具的位置与快捷键​二、画笔工具选项栏设置​三、画笔工具的进阶应用​四、常见问题及解决方法​总结前言 在 Photoshop 的众多工具中&#xff0c;画笔工具无疑是极具创造力和实用性的工具之一。无论是进行图像绘制、照片修饰&#xff0c;还是特效制作&…

window10和ubuntu22.04双系统之卸载ubuntu系统

window10和ubuntu22.04双系统之卸载ubuntu系统&#xff09;1. 删除Ubuntu系统占用的磁盘分区&#xff08;在Windows下操作&#xff09;2. 删除ubuntu开机引导项1. winr出来终端提示框后输入2. 然后会在命令行中显示电脑的硬盘列表&#xff0c;输入命令选择安装Windows的那个硬盘…

(C++)C++类和类的方法(基础教程)(与Python类的区别)

前言&#xff1a; 本篇博客建议搭配&#xff1a;&#xff08;Python&#xff09;类和类的方法&#xff08;基础教程介绍&#xff09;&#xff08;Python基础教程&#xff09;-CSDN博客 一起学习使用&#xff1b; 源代码&#xff1a; #include <iostream> #include &…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 微博文章数据可视化分析-文章分类下拉框实现

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解微博文章数据可视化分析-文章分类下拉框实现…

Git命令保姆级教程

Git 入门网站 https://learngitbranching.js.org/?localezh_CN Git 命令 git init // 在本地目录内部会生成.git文件夹 git initgit clone // 从git服务器拉取代码 // 代码下载完成后在当前文件夹中会有一个 shop 的目录&#xff0c;通过 cd shop 命令进入目录。 git clone ht…

Java Ai For循环 (day07)

循环结构 for&#xff1a;循环语句的作用&#xff1a;可以将一段代码重复的执行很多次for 循环语句格式&#xff1a;执行流程&#xff1a; 初始化语句执行条件判断语句&#xff0c;看结果是 true&#xff0c;还是 false false结束&#xff0c;true继续执行循环体语句执行条件控…

Directory Opus 使用优化

自定义快捷键 Directory Opus 移动标签到另一栏 设置快捷键&#xff1a;ctrl←/→ 设置步骤&#xff1a; 打开【设置】—>选择【自定义工具栏和快捷键】 选择【新建】—>【新建窗口快捷键】 输入快捷键命令 Go TABMOVEother此时可以点击运行进行测试&#xff0c;…

Qt知识点2『Ubuntu24.04.2安装Qt5.12.9各种报错』

问题1&#xff1a;Qt安装完毕后&#xff0c;新建一个最简单的测试程序&#xff0c;但是QtCreator左侧构建的三个按钮呈现灰色&#xff0c;无法进行构建操作答&#xff1a;进入QtCreator的Kits界面&#xff08;工具-选项&#xff09;&#xff0c;点击"自动检测"下的De…

TS面试题

1.TS有哪些类型&#xff08;对比与js&#xff09;&#xff1f;关键字/语法用途示例any关闭类型检查let a: any 4unknown类型安全的 anylet u: unknown 4; if (typeof u number) …never永不存在的值function err(): never { throw 0; }void无返回值function f(): void {}enu…

借助Early Hints和HarperDB改善网页性能

对电商网站来说&#xff0c;糟糕的页面性能可能会增加交易放弃率。一直以来&#xff0c;人们会使用CDN进行缓存从而缩短页面加载时间&#xff0c;但即便实施了强大的缓存&#xff0c;消费者在通过移动网络访问这些网站时可能仍然会需要频繁等待。最近诞生了一种名为“早期提示”…