第 4 章 RabbitMQ 进阶

mandatory 参数 Returning | RabbitMQ

         当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者,通过调用channel.addReturnListener 来添加 ReturnListener 监听器实现。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。

        channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText,String exchange, String routingKey,AMQP.BasicProperties basicProperties,byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.Return返回的结果是:" + message);}});

备份交换器 Alternate Exchanges | RabbitMQ

        // 创建备份交换机、队列、声明绑定关系String myAeExchange = "myAe";channel.exchangeDeclare(myAeExchange, "fanout", true, false, null);channel.queueDeclare("unroutedQueue", true, false, false, null);channel.queueBind("unroutedQueue", myAeExchange, "");// 创建普通交换机, 并设置 alternate-exchange,队列、声明绑定关系Map<String, Object> args = new HashMap<String, Object>();args.put("alternate-exchange", myAeExchange);channel.exchangeDeclare("normalExchange", "direct", true, false, args);channel.queueDeclare("normalQueue", true, false, false, null);channel.queueBind("normalQueue", "normalExchange", "normalKey");

        如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。 

4.2过期时间 TTL Time-To-Live and Expiration | RabbitMQ

        第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。channel.queueDeclare 方法中加入x-message-ttl 参数实现的,这个参数的单位是毫秒。

        第二种方法是对消息本身进行单独设置

每条消息的TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(这点不是绝对的,可以参考 4.3 节)。 

            // 声明带TTL的队列Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-message-ttl", 30000);channel.queueDeclare("ttl.queue", true, false, false, queueArgs);// 发送带TTL属性的消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("5000").build();channel.basicPublish("", "ttl.queue", props, "Test TTL Message".getBytes())

4.3 死信&延时队列

        DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX 的队列就称之为死信队列。

出现的场景

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false; 
  • 消息过期; 
  • 队列达到最大长度。
// 创建一个持久化、非排他的、非自动删除的队列channel.exchangeDeclare("exchange.dlx", "direct", true);channel.queueDeclare("queue.dlx", true, false, false, null);channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");// 创建一个不用队列 并设置 TTL 以及 DLX、DLXroutingkeychannel.exchangeDeclare("exchange.normal", "fanout", true);Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000);args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare("queue.normal", true, false, false, args);channel.queueBind("queue.normal", "exchange.normal", "");channel.basicPublish("exchange.normal", "rk",MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());

        由Web 管理页面(图4-3)可以看出,两个队列都被标记了“D”,这个是durable 的缩写,即设置了队列持久化。queue.normal 这个队列还配置了 TTL、DLX 和 DLK,其中 DLX 指的是x-dead-letter-routing-key 这个属性。 

        在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。 

4.7 持久化

        RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。 

deliveryMode(2)  // 设置为持久化

        将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。 
        首先从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为 false,并进行手动确认,详细可以参考3.5 节。 

        其次,在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的fsync1方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内
RabbitMQ 服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将丢失。RabbitMQ 在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha 状态的消息数量大于此值时,就会引起消息的状态转换会丢失。 这里可以引入 RabbitMQ 的镜像队列机制(详细参考 9.4 节)

4.8 生产者确认 

        生产者如何知道消息有没有正确地到达服务器

4.8.1 事务机制

  • 通过事务机制实现; 
  • 通过发送方确认(publisher confirm)机制实现。
try { channel.txSelect(); channel.basicPublish(exchange, routingKey,  MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); 
} catch (Exception e) { e.printStackTrace(); channel.txRollback(); 
} 

    4.8.2 发送方确认机制

      采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,这里就引入了一种轻量级的方式——发送方确认(publisher confirm)机制。 

            生产者通过调用 channel.confirmSelect 方法(即 Confirm.Select 命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消
    息既被 ack 又被 nack 的情况,并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证

            try {channel.confirmSelect();//将信道置为publisher confirm模式//之后正常发送消息channel.basicPublish("exchange", "routingKey", null,"publisher confirm test".getBytes());if (!channel.waitForConfirms()) {// 当消息发送不成功时候进入 if 代码块 do something else....System.out.println("send message failed");}} catch (InterruptedException e) {e.printStackTrace();}

    注意要点: 
    (1)事务机制和 publisher confirm 机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设置为 publisher confirm 模式,RabbitMQ 会报错:{amqp_error, precondition_ failed, "cannot switch from tx to confirm mode", 'confirm.select'};或者如果企图将已开启 publisher confirm 模式的信道再设置为事务模式,RabbitMQ 也会报错:{amqp_error, precondition_failed, "cannot switch from confirm to tx 
    mode", 'tx.select' }。 
    (2)事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。更进一步地讲,发送方要配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。 

            publisher confirm 的优势在于并不一定需要同步确认。这里我们改进了一下使用方式,总结有如下两种:

    • 批量 confirm 方法:每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回。 

            相比于前面示例中的普通 confirm 方法,批量极大地提升了 confirm 的效率,但是问题在于出现返回 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能应该是不升反降的。 

    channel.confirmSelect(); // 开启确认模式
    for(int i=0; i<100; i++){channel.basicPublish("", "queue", null, message.getBytes());
    }
    // 批量确认所有未确认消息
    channel.waitForConfirmsOrDie(5000); // 超时5秒// 缺点:简单但会阻塞生产者线程,批量失败需重发全部消息
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
    channel.addConfirmListener((sequenceNumber, multiple) -> {if(multiple) {outstandingConfirms.headMap(sequenceNumber, true).clear();} else {outstandingConfirms.remove(sequenceNumber);}
    }, (sequenceNumber, multiple) -> {// NACK处理逻辑
    });// 批量发送100条消息for (int i = 0; i < 100; i++) {String message = "Msg-" + i;long seqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(seqNo, message);channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}

    4.9 消费端要点介绍

            消费者客户端可以通过推模式或者拉模式(推荐方式)的方式来获取并消费消息,当消费者处理完业务逻辑需要手动确认消息已被接收,这样 RabbitMQ才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。 

    • 消息分发; 
    • 消息顺序性; 
    • 弃用 QueueingConsumer。 

    消息顺序

    在RabbitMQ中保证消息顺序性需结合队列特性和业务设计,以下是核心方案:

    一、基础保障机制

    1. 单队列单消费者模式

      • 利用队列FIFO特性,仅允许一个消费者处理队列,避免并发消费导致乱序
      • 缺点:吞吐量受限,需配合消息持久化和手动ACK确保可靠性
    2. 分区消费策略

      • 通过路由键将关联消息(如相同订单ID)固定路由到同一队列,每个队列对应独立消费者
      • 示例:使用Direct交换机按业务ID路由,实现"局部顺序性"

    二、增强控制手

    1. 消息序列化标记

      • 在消息体中嵌入序列号,消费者端通过缓存排序实现逻辑顺序控制
      • 需配合幂等处理避免重复消息干扰
    2. 单活消费者模式

      • 通过x-single-active-consumer参数确保队列同一时间仅有一个活跃消费者,故障时自动切换
      // Spring AMQP配置示例 
      Map<String, Object> args = new HashMap<>(); 
      args.put("x-single-active-consumer", true); 
      new Queue("seq_queue", true, false, false, args);

    三、高级方案

    1. 事务与发布确认

      • 生产者启用事务或发布确认机制,确保消息按发送顺序持久化到队列
      • 事务适用于批量消息,确认机制适合单条消息
    2. 死信队列重试

      • 对处理失败的消息进入死信队列延时重试,避免立即重入破坏顺序

      4.10 消息传输保障

      第九章 RabbitMQ 高阶

      9.1 存储机制 

              不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。这两种类型的消息的落盘处理都在RabbitMQ 的“持久层”中完成。 

      • 持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。
      • 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

      持久层

      • 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者ack 等。每个队列都有与之对应的一个rabbit_queue_index
      • 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。从技术层面上来说,rabbit_msg_store 具体还可以分为msg_store_persistent 和 msg_store_transient
      •         msg_store_persistent 负责持久化消息的持久化,重启后消息不会丢失;
      •         msg_store_transient 负责非持久化消息的持久化,重启后消息会丢失。

      结构查看  /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1  

       队列的结构

      • rabbit_amqqueue_process 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack)等。
      • backing_queue 是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

      队列消息状态

      • alpha:消息内容(包括消息体、属性和 headers)和消息索引都存储在内存中。 (alpha状态最耗内存,但很少消耗CPU)
      • beta:消息内容保存在磁盘中,消息索引保存在内存中(只需要一次I/O 操作就可以读取到消息(从 rabbit_msg_store 中))。 
      • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有(只需要一次I/O 操作就可以读取到消息(从 rabbit_msg_store 中))。 
      • delta:消息内容和索引都在磁盘中。(状态基本不消耗内存,但是需要消耗更多的 CPU 和磁盘 I/O 操作,delta 状态需要执行两次I/O 操作才能读取到消息,一次是读消息索引(从 rabbit_queue_index 中),一次是读消息
        内容(从 rabbit_msg_store 中))

      本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
      如若转载,请注明出处:http://www.pswp.cn/web/94185.shtml
      繁体地址,请注明出处:http://hk.pswp.cn/web/94185.shtml
      英文地址,请注明出处:http://en.pswp.cn/web/94185.shtml

      如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

      相关文章

      BEVDet4D

      1. BEVDet4D算法动机及开创性思路 1&#xff09;BEVDet算法概述输入输出&#xff1a;输入为6视角图像&#xff08;NuScenes数据集&#xff09;&#xff0c;输出为3D检测结果核心模块&#xff1a; 图像编码器&#xff1a;由Backbone网络和多尺度特征融合网络组成&#xff0c;处理…

      当 AI 学会 “理解” 人类:自然语言处理的进化与伦理边界

      大家可以去我的资源看看&#xff0c;有很多关于AI的免费资源可以下载&#xff0c;不下载也可以看看&#xff0c;真的对你有用引言&#xff1a;从 “对话” 到 “理解”——AI 语言能力的时代跃迁现实锚点&#xff1a;以日常场景切入&#xff08;如 ChatGPT 流畅回应复杂问题、A…

      WPF控件随窗体大宽度高度改变而改变

      前台控件中&#xff1a;Width"{Binding RelativeSource{RelativeSource AncestorTypeWindow}, PathWidth}"后台代码&#xff1a;定义在加载事件里面this.SizeChanged ProductData_SizeChanged;private void ProductData_SizeChanged(object sender, SizeChangedEven…

      E10 通过RPC实现账号批量锁定与解锁

      需求背景&#xff1a;账号信息由三方系统管理&#xff0c;包含账号状态&#xff0c;所以需要通过提供给三方的 Rest 接口中&#xff0c;实现账号锁定与解锁。参考基线版本&#xff1a;10.0.2506.01&#xff0c;过低的版本可能无法使用。 锁定分为两种&#xff1a; &#xff08;…

      什么是AI宠物

      什么是AI宠物AI宠物是由AI大脑驱动的生命体AI产品。它能主动产生情绪和意图&#xff0c;并通过情绪和意图去驱动自己的动作和行为。它根据自己的意愿和用户互动&#xff0c;不受用户控制。从一定意义上讲&#xff0c;它拥有了人工生命和自由意志。它有自己的行为逻辑&#xff0…

      简单AI:搜狐公司旗下AI绘画产品

      本文转载自&#xff1a;简单AI&#xff1a;搜狐公司旗下AI绘画产品 - Hello123工具导航 ** 一、平台定位与技术特性 搜狐简单 AI 是搜狐推出的多模态 AI 创作平台&#xff0c;基于自研大模型提供文生图、文生文等能力。它专注于零门槛内容生成&#xff0c;用户无需专业技能即…

      vue3 3d饼图

      完整3D饼图项目下载 https://download.csdn.net/download/weixin_54645059/91716476 只有一个vue文件 直接下滑到完整代码就阔以 本文介绍了如何使用ECharts和ECharts-GL插件实现3D饼图效果&#xff0c;并提出了数值显示未解决的问题。主要包含以下内容&#xff1a; 安装所需…

      全球电商业财一体化趋势加速,巨益科技助力品牌出海精细化运营

      行业背景&#xff1a;跨境电商进入品牌化发展新阶段随着国内电商市场竞争日趋激烈&#xff0c;跨境电商已成为中国品牌寻求增长突破的重要赛道&#xff0c;在TikTok、Temu等平台出海浪潮推动下&#xff0c;越来越多的中国品牌开始布局全球市场。然而&#xff0c;从单一市场的铺…

      【序列晋升】13 Spring Cloud Bus微服务架构中的消息总线

      Spring Cloud Bus作为微服务架构中的关键组件&#xff0c;通过消息代理实现分布式系统中各节点的事件广播与状态同步&#xff0c;解决了传统微服务架构中配置刷新效率低下、系统级事件传播复杂等问题。它本质上是一个轻量级的事件总线&#xff0c;将Spring Boot Actuator的端点…

      [激光原理与应用-314]:光学设计 - 光学系统设计与电子电路设计的相似或相同点

      光学系统设计与电子电路设计虽分属不同工程领域&#xff0c;但在设计理念、方法论和工程实践中存在诸多相似或相同点。这些共性源于两者均需解决复杂系统的优化问题&#xff0c;并遵循工程设计的通用规律。以下是具体分析&#xff1a;一、设计流程的相似性需求分析与规格定义光…

      Linux学习:信号的保存

      目录1. 进程的异常终止与core dump标志位1.1 进程终止的方式1.2 core方案的作用与使用方式2. 信号的保存2.1 信号的阻塞2.2 操作系统中的sigset_t信号集类型2.3 进程PCB中修改block表的系统调用接口2.4 信号阻塞的相关问题验证1. 进程的异常终止与core dump标志位 1.1 进程终止…

      数据分析编程第二步: 最简单的数据分析尝试

      2.1 数据介绍有某公司的销售数据表 sales.csv 如下:第一行是标题&#xff0c;解释每一列存了什么东西。第二行开始每一行是一条数据&#xff0c;对应一个订单。这种数据有个专业的术语&#xff0c;叫结构化数据。这是现代数据处理中最常见的数据类型。整个表格的数据统称为一个…

      UDP报文的数据结构

      主要内容参照https://doc.embedfire.com/net/lwip/zh/latest/doc/chapter14/chapter14.html#id6&#xff0c;整理出来自用。 1. UDP 报文首部结构体&#xff08;udp_hdr&#xff09; 为清晰定义 UDP 报文首部的各个字段&#xff0c;LwIP 设计了udp_hdr结构体&#xff0c;其包含…

      图论与最短路学习笔记

      图论与最短路在数学建模中的应用 一、图论模型图 G(V,E)G(V,E)G(V,E) VVV&#xff1a;顶点集合EEE&#xff1a;边集合每条边 (u,v)(u,v)(u,v) 赋予权值 w(u,v)w(u,v)w(u,v)&#xff0c;可用 邻接矩阵 或 邻接表 表示。二、最短路问题的数学形式 目标&#xff1a;寻找从源点 sss…

      第九节 Spring 基于构造函数的依赖注入

      当容器调用带有一组参数的类构造函数时&#xff0c;基于构造函数的 DI 就完成了&#xff0c;其中每个参数代表一个对其他类的依赖。接下来&#xff0c;我们将通过示例来理解 Spring 基于构造函数的依赖注入。示例&#xff1a;下面的例子显示了一个类 TextEditor&#xff0c;只能…

      【数据库】PostgreSQL详解:企业级关系型数据库

      文章目录什么是PostgreSQL&#xff1f;核心特性1. 标准兼容性2. 扩展性3. 高级功能4. 可靠性数据类型1. 基本数据类型2. 高级数据类型基本操作1. 数据库操作2. 表操作3. 数据操作高级查询1. 连接查询2. 子查询3. 窗口函数JSON操作1. JSON数据类型2. JSON查询3. JSON索引全文搜索…

      FFMPEG相关解密,打水印,合并,推流,

      1&#xff1a;ffmepg进行打水印解密 前提ffmepg安装利用静态版就可以这个什么都有&#xff0c;不用再配置其他信息&#xff1a;&#xff08;这个利用ffmpeg终端命令是没问题的&#xff0c;但是如果要是再C中调用ffmpeg库那么还需要从新编译安装下&#xff09; 各个版本 Inde…

      MySql知识梳理之DML语句

      注意: 插入数据时&#xff0c;指定的字段顺序需要与值的顺序是一一对应的。 字符串和日期型数据应该包含在引号中。 插入的数据大小&#xff0c;应该在字段的规定范围内注意:修改语句的条件可以有&#xff0c;也可以没有&#xff0c;如果没有条件&#xff0c;则会修改整张表的所…

      GaussDB GaussDB 数据库架构师修炼(十八)SQL引擎-SQL执行流程

      1 SQL执行流程查询解析&#xff1a;词法分析、语法分析、 语义分析 查询重写&#xff1a;视图和规则展开、基于规则的查询优化 计划生成&#xff1a;路径搜索和枚举、选出最优执行计划 查询执行&#xff1a;基于优化器生成的物理执行计划对数据进行获取和计算2 解析器和优化器S…

      grpc 1.45.2 在ubuntu中的编译

      要在 Ubuntu 上编译 gRPC 1.45.2&#xff0c;需要按照以下步骤操作。以下指南基于 gRPC 官方文档和相关资源&#xff0c;确保环境配置正确并成功编译。请确保你有管理员权限&#xff08;sudo&#xff09;以安装依赖项和执行相关命令。 1. 准备环境 确保你的 Ubuntu 系统已安装…