一、消息确认机制

生产者发送的消息,可能有以下两种情况:

1> 消息消费成功

2> 消息消费失败

为了保证消息可靠的到达消费者(!!!注意:消息确认机制和前面的工作模式中的publisher confirms模式有很大区别,消息确认保证的是消息可靠的到达消费者,而publisher confirms保证的是消息可靠的到达RabbitMQServer),RabbitMQ引入了消息确认机制:

消费者在消费消息时,可以指定autoAck参数,对应着两种确认方式

(1)自动确认:消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;

(2)手动确认:消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。

DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

如果将basicConsume中的true改为false,对于RabbitMQ服务器来说,消息分成两个部分:

1>  未发送给消费者的消息;

2>  已经发送给消费者,但还没有被确认的消息。

对应管理界面:


二、手动确认方法

RabbitMQ提供了两种手动确认,分别是肯定确认和否定确认,对应着3个方法:

(1)肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

    其中,deliveryTag为消息的唯一表示,在每一个channel中都是唯一的,相当于TCP协议中的序号的作用,用来确认哪条消息已经收到,multiple为true表示是否批量确认,同样与TCP中确认序号的作用类似,表示在这个deliveryTag前的消息都已收到,为false则表示一次只确认一条消息。

(2)否定确认: Channel.basicReject(long deliveryTag, boolean requeue)

   如果消息到达消费者后,消费者未正确处理这条消息(如发生异常),就可以通过这个方法进行否定确认,其中,参数requeue表示这条消息是否需要重新入队,这个方法一次只能确认一条消息。

(3)否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

   与basicAck一样,这个方法可以通过multiple来实现批量确认


三、使用Spring Boot演示消息确认机制

  演示之前,需要先了解Spring-AMQP的确认机制,它和RabbitMQ JDK Client库的确认机制有些许不同:

1. AcknowledgeMode.NONE

  和RabbitMQ JDK Client库的自动确认机制一样,只要消息到达消费者,这条消息就会从队列中移除。

2. AcknowledgeMode.AUTO(和JDK Client库的区别)

  消息到达消费者且处理成功,才会自动确认,如果消息在处理过程中发生了异常,则不会自动确认。

3. AcknowledgeMode.MANUAL

  和JDK Client库一样,属于手动确认机制,同样分为肯定确认和否定确认。

接下来,进行准备工作,创建一个Spring Boot项目,添加RabbitMQ依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加RabbitMQ相关配置:

spring:rabbitmq:addresses: amqp://study:study@110.41.17.130:5672/extensionlistener:simple:acknowledge-mode: none #表示当前确认机制未AcknowledgeMode.NONE

添加下图的包,创建对应的类:

 


3.1  AcknowledgeMode.NONE

(1)编写常量类(由于只是验证消息确认机制,可以随便使用一种工作模式)

public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}

(2)配置声明队列、交换机、交换机与队列绑定关系

@Configuration
public class RabbitMQConfig {//1.声明队列@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//2.声明交换机@Bean("ackExchange")public FanoutExchange ackExchange(){return ExchangeBuilder.fanoutExchange(Constants.ACK_EXCHANGE).build();}//3.声明队列与交换机的绑定关系@Beanpublic Binding ackBinding(@Qualifier("ackExchange") FanoutExchange fanoutExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}
}

(3)编写生产者代码

@RestController
@RequestMapping("/producer")
public class ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"","consumer ack mode test...");return "发送消息成功";}
}

(4)编写消费者代码

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//            int num = 10/0;System.out.println("业务处理完成");}
}

(5)测试正常消费消息的情况


(6)测试消息消费异常情况(将消费者代码中得 int num  = 10/0  注解打开)


(7)总结

  经过确认,可以发现AcknowledgeMode.NONE机制,无论消费者是否正确消费消息,都会自动确认,不会保留异常消费的消息


3.2 AcknowledgeMode.AUTO

(1) 修改配置文件中的 acknowledge-mode: none 为 acknowledge-mode: auto

(2)演示消息正常消费的情况(将 int num = 10/0 注释)

  @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//            int num = 10/0;System.out.println("业务处理完成");}

运行程序,访问 producer/ack 接口发送消息:


(3)演示消息处理异常情况(取消 int num = 10/0 的注释)

 @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");int num = 10/0;System.out.println("业务处理完成");}

运行程序,访问生产者端口:

消息没有成功消费,如果此时我们修改代码(将 int num = 10/0 注释,使程序不再发生异常),再次运行程序,队列中保存的消息就会被正常消费:


3.3 AcknowledgeMode.MANUAL

  (1) 修改配置文件中 acknowledge-mode: auto 为 acknowledge-mode: manul

(2)修改消费者代码(由于是手动确认,需要在代码中添加正常消费时的 “肯定确认” 和 消费异常时的 “否定确认”)

    @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//                int num = 10/0;System.out.println("业务处理完成");//消息正常消费,肯定确认channel.basicAck(deliveryTag,false);//单条确认}catch (Exception e){//消息消费异常,否定确认channel.basicNack(deliveryTag,false,true);//单条否定,异常后重新入队}}

(3)消息正常消费时的情况(注释 int num = 10/0)

接下来注释掉手动肯定确认的这行代码,看看会发生什么情况:

 //消息正常消费,肯定确认//channel.basicAck(deliveryTag,false);//单条确认

再次运行程序并通过接口 producer/ack 发送消息:

可以看到,在AcknowledgeMode.MANUAL机制下,如果不手动确认,队列不会移除已经被正常消费的消息:


(3)消息消费异常的情况下(取消 int num = 10/0 的注释)

接下来注释掉channel.basicNack,运行程序,访问接口发送消息:

如果将参数requeue置为false,会怎么样?

channel.basicNack(deliveryTag,false,false);//单条否定,重新发送消息

运行程序:


(4)总结

1> 无论是否正常处理消息都要进行手动确认;

2> 正常处理消息但未手动确认,管理界面中的队列会有 一条/多条 Unacked 的消息(重新启动程序后会重新消费);

3> 异常处理消息且未手动确认,也会有 一条/多条 Unacked 的消息(重启程序同样重新消费);

4> 如果异常处理,将requeue置为false,队列不会保存 这条/多条 异常消费的消息

                                               置为true,队列会不断重新发送 这条/多条 消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/82333.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/82333.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/82333.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++异步(1)

什么是异步? 异步就是多个线程是同时执行的&#xff0c;与之相对的就是线程同步&#xff0c;二者都应用在并发的场景上。 异步的特点 异步执行的任务无需等待其他任务完成&#xff0c;其本身是通过非阻塞的方式执行的&#xff0c;不依赖前驱任务&#xff0c;通常用于IO密集…

向量数据库Milvus03-高级功能与性能调优

Milvus高级功能与性能调优 目录 高级特性详解性能调优技巧生产环境部署最佳实践总结与展望 1. 高级特性详解 1.1 多索引兼容 Milvus 支持多种索引类型&#xff08;如 HNSW、IVF_PQ、IVF_FLAT&#xff09;的混合使用&#xff0c;以适应不同场景的需求。 HNSW&#xff08;Hier…

5月24日day35打卡

模型可视化与推理 知识点回顾&#xff1a; 三种不同的模型可视化方法&#xff1a;推荐torchinfo打印summary权重分布可视化进度条功能&#xff1a;手动和自动写法&#xff0c;让打印结果更加美观推理的写法&#xff1a;评估模式 作业&#xff1a;调整模型定义时的超参数&#x…

野火鲁班猫(arrch64架构debian)从零实现用MobileFaceNet算法进行实时人脸识别(三)用yolov5-face算法实现人脸检测

环境直接使用第一篇中安装好的环境即可 先clone yolov5-face项目 git clone https://github.com/deepcam-cn/yolov5-face.git 并下载预训练权重文件yolov5n-face.pt 网盘链接: https://pan.baidu.com/s/1xsYns6cyB84aPDgXB7sNDQ 提取码: lw9j &#xff08;野火官方提供&am…

R语言科研编程-柱状图

R语言简介 R语言是一种开源的统计计算和图形绘制编程语言&#xff0c;广泛应用于数据分析、机器学习、数据可视化等领域。它由Ross Ihaka和Robert Gentleman于1993年开发&#xff0c;具有丰富的统计函数库和图形功能&#xff0c;尤其适合数据科学研究和可视化任务。 使用R语言…

Android-Handler学习总结

​​面试官​&#xff1a;你好&#xff01;我看你简历里提到熟悉 Android 的 Handler 机制&#xff0c;能简单说一下它的作用吗&#xff1f; ​候选人​&#xff1a; Handler 是 Android 中用来做线程间通信的工具。比如Android 应用的 UI 线程&#xff08;也叫主线程…

【iOS】分类、扩展、关联对象

分类、扩展、关联对象 前言分类扩展扩展和分类的区别关联对象key的几种用法流程 总结 前言 最近的学习中笔者发现自己对于分类、扩展相关知识并不是很熟悉&#xff0c;刚好看源码类的加载过程中发现有类扩展与关联对象详解。本篇我们来探索一下这部分相关知识&#xff0c;首先…

30.第二阶段x64游戏实战-认识网络数据包发送流程

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;图灵Python学院 上一个内容&#xff1a;29.第二阶段x64游戏实战-技能冷却 发送数据包的方式&#xff08;函数&#xff09;操作系统提供…

【每日一题】【前缀和优化】【前/后缀最值】牛客练习赛139 B/C题 大卫的密码 (Hard Version) C++

牛客练习赛139 B题 大卫的密码 (Easy Version) 牛客练习赛139 C题 大卫的密码 (Hard Version) 大卫的密码 题目背景 牛客练习赛139 题目描述 给定一个 n m n\times m nm的网格图&#xff0c;我们使用 ( i , j ) (i,j) (i,j)表示网格中从上往下数第 i i i行和从左往右数第…

文件夹图像批处理教程

前言 因为经常对图像要做数据清洗&#xff0c;又很费时间去重新写一个&#xff0c;我一直在想能不能写一个通用的脚本或者制作一个可视化的界面对文件夹图像做批量的修改图像大小、重命名、划分数据训练和验证集等等。这里我先介绍一下我因为写过的一些脚本&#xff0c;然后我…

【Unity实战笔记】第二十四 · 使用 SMB+Animator 实现基础战斗系统

转载请注明出处&#xff1a;&#x1f517;https://blog.csdn.net/weixin_44013533/article/details/146409453 作者&#xff1a;CSDN|Ringleader| 1 结构 1.1 状态机 1.2 SMB 2 代码实现 2.1 核心控制 Player_Base_SMB 继承 StateMachineBehaviour &#xff0c;控制变量初始…

Python虚拟环境再PyCharm中自由切换使用方法

Python开发中的环境隔离是必不可少的步骤,通过使用虚拟环境可以有效地管理不同项目间的依赖,避免包冲突和环境污染。虚拟环境是Python官方提供的一种独立运行环境,每个项目可以拥有自己单独的环境,不同项目之间的环境互不影响。在日常开发中,结合PyCharm这样强大的IDE进行…

大模型智能体入门扫盲——基于camel的概述

前言 本篇博客想带读者进行一个智能体入门扫盲&#xff0c;了解基础知识&#xff0c;为什么用camel呢&#xff0c;因为小洛发现它们文档对这种智能体的基本组件介绍得很全面深入。 基础概念 agent 一个典型的agent智能体包含三个核心部分&#xff1a; 感知模块&#xff1…

目标检测 RT-DETR(2023)详细解读

文章目录 主干网络&#xff1a;Encoder&#xff1a;不确定性最小Query选择Decoder网络&#xff1a; 将DETR扩展到实时场景&#xff0c;提高了模型的检测速度。网络架构分为三部分组成&#xff1a;主干网络、混合编码器、带有辅助预测头的变换器编码器。具体来说&#xff0c;先利…

DeepSeek 赋能数字农业:从智慧种植到产业升级的全链条革新

目录 一、数字农业的现状与挑战二、DeepSeek 技术解析2.1 DeepSeek 的技术原理与优势2.2 DeepSeek 在人工智能领域的地位与影响力 三、DeepSeek 在数字农业中的应用场景3.1 精准种植决策3.2 病虫害监测与防治3.3 智能灌溉与施肥管理3.4 农产品质量追溯与品牌建设 四、DeepSeek …

<uniapp><vuex><状态管理>在uniapp中,如何使用vuex实现数据共享与传递?

前言 本专栏是基于uniapp实现手机端各种小功能的程序&#xff0c;并且基于各种通讯协议如http、websocekt等&#xff0c;实现手机端作为客户端&#xff08;或者是手持机、PDA等&#xff09;&#xff0c;与服务端进行数据通讯的实例开发。 发文平台 CSDN 环境配置 系统&…

高速串行差分信号仿真分析及技术发展挑战续

7.3 3.125Gbps 差分串行信号设计实例仿真分析 7.3.1 设计用例说明 介绍完 Cadence 系统本身所具有的高速差分信号的仿真分析功能之后&#xff0c;我们以一个实例来说明 3.125Gbps 以下的高速差分系统的仿真分析方法。 在网上下载的设计文件“Booksi_Demo_Allegro160_Finishe…

【Golang】部分语法格式和规则

1、时间字符串和时间戳的相互转换 func main() {t1 : int64(1546926630) // 外部传入的时间戳&#xff08;秒为单位&#xff09;&#xff0c;必须为int64类型t2 : "2019-01-08 13:50:30" // 外部传入的时间字符串//时间转换的模板&#xff0c;golang里面只能是 &quo…

第十六章:数据治理之数据架构:数据模型和数据流转关系

本章我们说一下数据架构&#xff0c;说到数据架构&#xff0c;就很自然的想到企业架构、业务架构、软件架构&#xff0c;因为个人并没有对这些内容进行深入了解&#xff0c;所以这里不做比对是否有相似或者共通的地方&#xff0c;仅仅来说一下我理解的数据架构。 1、什么是架构…

Day126 | 灵神 | 二叉树 | 层数最深的叶子结点的和

Day126 | 灵神 | 二叉树 | 层数最深的叶子结点的和 1302.层数最深的叶子结点的和 1302. 层数最深叶子节点的和 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 这道题用层序遍历的思路比较好想&#xff0c;就把每层的都算一下&#xff0c;然后返回最后一层的和就…