RabbitMQ 是一个 消息队列中间件(Message Broker),实现了 AMQP 协议,常用于服务之间解耦、异步处理、流量削峰等场景。
我帮你分成两个部分来讲:核心原理 + 常见用法


🧩 一、核心原理

RabbitMQ 的核心是 生产者(Producer) → 交换机(Exchange) → 队列(Queue) → 消费者(Consumer) 这一条链路。

1. 基础概念

  • Producer:消息的发送方。
  • Consumer:消息的接收方。
  • Queue:存储消息的队列。
  • Exchange(交换机):接收生产者的消息,按照规则路由到队列。
  • Binding:交换机和队列之间的绑定规则。
  • Routing Key:消息的“路由标识”,交换机根据它决定发给哪个队列。
  • Broker:RabbitMQ 服务本身。

2. 交换机类型(Exchange Types)

RabbitMQ 支持几种路由模式:

  1. direct:按 routing key 精确匹配,把消息路由到对应队列。
  2. fanout:广播模式,消息发送到所有绑定的队列。
  3. topic:模糊匹配路由(如 order.*),支持通配符。
  4. headers:根据消息头部的键值对匹配(较少用)。

3. ACK & 消息可靠性

  • ACK:消费者处理完消息后确认(acknowledge)。
  • 未 ACK:RabbitMQ 会重新投递消息(保证至少一次投递)。
  • 持久化:队列持久化 + 消息持久化,确保 RabbitMQ 重启后消息不丢失。
  • 死信队列(DLX):处理无法投递或过期的消息。

4. 工作模式

  • 简单队列(Simple Queue):一个生产者 → 一个消费者。
  • 工作队列(Work Queue):一个生产者 → 多个消费者(任务分摊)。
  • 发布订阅(Pub/Sub):广播消息(fanout exchange)。
  • 路由模式(Routing):按 key 精确路由(direct exchange)。
  • 主题模式(Topic):模糊匹配路由(topic exchange)。

🛠️ 二、常见用法(Python 示例)

安装依赖:

pip install pika

1. 简单队列

生产者:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者:

import pikadef callback(ch, method, properties, body):print(f" [x] Received {body}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2. 工作队列(任务分摊)

  • 一个生产者发送很多任务。
  • 多个消费者并发消费,每个消息只会被一个消费者处理。
  • 常用在 后台任务处理

设置 prefetch_count=1 可以让 RabbitMQ 公平分发任务:

channel.basic_qos(prefetch_count=1)

3. 发布/订阅(fanout)

生产者:

channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='log message')

消费者:

channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)

这样消费者会自动生成临时队列,接收广播。


4. 路由模式(direct)

生产者:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error log')

消费者:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
queue_name = channel.queue_declare(queue='', exclusive=True).method.queue
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

🎯 总结

  • RabbitMQ = 生产者 → 交换机 → 队列 → 消费者
  • 交换机类型决定了 消息的路由方式(direct、fanout、topic、headers)。
  • 可靠性依赖 ACK + 持久化 + 死信队列
  • 常见场景:异步任务、削峰填谷、解耦微服务、日志广播。

好问题 👍,这几个是 RabbitMQ 保证消息可靠性 的关键机制。我帮你逐个拆开讲:


🟢 1. ACK(消息确认机制)

  • 默认行为:消费者从队列里拿到消息后,RabbitMQ 就认为它“已消费”,会立即从队列里删除。
  • 风险:如果消费者拿到消息后宕机/异常,消息就丢了。

👉 ACK 就是解决这个问题的机制

  • 自动 ACK (auto_ack=True)

    • 一旦消费者收到消息,就立刻确认,哪怕还没处理完。
    • 风险:消费者挂了,消息丢失。
  • 手动 ACK (auto_ack=False)(推荐)

    • 消费者处理完任务后,再调用 channel.basic_ack() 确认。
    • 如果消费者挂了,RabbitMQ 会把消息重新投递给别的消费者。

例子:

def callback(ch, method, properties, body):print("处理消息:", body)# 处理完成后手动确认ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

🔑 作用:确保消息至少被处理一次,不会因为消费者挂掉而丢失。


🟢 2. 持久化(Persistence)

RabbitMQ 的数据默认存在内存里,服务一旦重启,消息就没了。
👉 持久化保证 RabbitMQ 重启后消息不丢

持久化分三层:

  1. 队列持久化(声明时加 durable=True):

    channel.queue_declare(queue='task_queue', durable=True)
    

    → RabbitMQ 重启后,这个队列还在。

  2. 消息持久化(生产者发送时设置 delivery_mode=2):

    channel.basic_publish(exchange='',routing_key='task_queue',body='Hello',properties=pika.BasicProperties(delivery_mode=2,  # 2 表示持久化消息))
    

    → RabbitMQ 重启后,消息仍然在队列里。

  3. 交换机持久化(声明时加 durable=True)。

🔑 作用:保证即使 RabbitMQ 崩溃或重启,消息不会丢失。


🟢 3. 死信队列(Dead Letter Queue, DLQ)

当某些消息 无法被正常消费 时,RabbitMQ 可以把它们转移到另一个队列里(死信队列),避免消息丢失。

死信队列触发的几种情况:

  1. 消费者 拒绝消息(nack/reject)requeue=False
  2. 消息在队列里 过期(TTL 超时)
  3. 队列满了,无法再接收新消息。

👉 配置死信队列的方法:

args = {'x-dead-letter-exchange': 'dlx_exchange',  # 指定死信交换机'x-dead-letter-routing-key': 'dlx_key'     # 指定路由 key
}
channel.queue_declare(queue='task_queue', durable=True, arguments=args)

然后消息会被转发到 死信队列,便于后续人工排查或重试。

🔑 作用:防止消息丢失 & 提供兜底处理机制。


🎯 总结

  • ACK:保证消费者挂掉时消息不会丢(至少投递一次)。
  • 持久化:保证 RabbitMQ 崩溃/重启时消息不会丢。
  • 死信队列:保证异常消息有去处(过期/拒绝/无法投递)。

这三个机制配合起来,RabbitMQ 就能实现 高可靠消息传递

好问题 👍!RabbitMQ 里的 队列满了(或者说消息堆积过多)是一个常见的情况,处理思路分两类:


🟢 1. 队列为什么会满?

队列本质上是内存+磁盘结构,如果消费者消费不过来,就会导致消息积压。几种常见原因:

  • 消费者处理能力不足(速度比不上生产者)。
  • 没有限制队列长度,消息无限堆积。
  • 消费者挂掉了,没人消费。
  • 某些消息过大,占满内存/磁盘。

🟢 2. RabbitMQ 的应对机制

(1) 设置队列最大长度/容量(防止无限堆积)

channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000,            # 最大消息数'x-max-length-bytes': 10485760   # 最大字节数 (10MB)}
)

超过限制后,旧消息会被丢弃(FIFO),或者转发到死信队列(推荐)。


(2) 配置死信队列(DLQ)

当队列满了时,新来的消息可以自动进入死信队列:

channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000,'x-dead-letter-exchange': 'dlx_exchange','x-dead-letter-routing-key': 'dlx_key'}
)

👉 新消息进不来时,直接进入 DLQ,避免消息丢失。


(3) 限流(QoS)

消费者可以设置一次最多处理多少条消息,避免被“压垮”:

channel.basic_qos(prefetch_count=1)  # 一次只取 1 条,处理完再取

这样 RabbitMQ 会 公平调度,不会把大量消息推给一个消费者。


(4) 水平扩展消费者

如果是消费能力不足,最直接的办法就是:多开几个消费者
RabbitMQ 会按照 Round Robin(轮询)公平分发 把消息分配下去。


(5) 生产端限流 / 拒绝

RabbitMQ 本身不对生产者限流,但你可以在应用层做:

  • 使用 发布确认(Publisher Confirms),如果消息积压,可以选择暂停生产。
  • 消息速率控制(Rate Limit),比如令牌桶算法,减缓生产速度。

🟢 3. 总结

当队列满了,可以这样处理:

  1. 预防堆积 → 设置 x-max-length / x-max-length-bytes
  2. 兜底方案 → 配置死信队列,把溢出的消息转移出来。
  3. 消费优化basic_qos + 增加消费者实例。
  4. 生产端调节 → 启用发布确认,动态调整生产速度。

👉 最佳实践:

  • 设置合理的队列长度 + 消息 TTL。
  • 配死信队列,确保不会无声丢失。
  • 消费端横向扩展,必要时加缓存层(Kafka 更适合高吞吐)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/921329.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/921329.shtml
英文地址,请注明出处:http://en.pswp.cn/news/921329.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

点控云智能客服:以AI重塑服务体验,登顶行业第一的革新之路

在数字化浪潮席卷全球的今天,客户服务已成为企业核心竞争力之一。智能客服作为连接企业与客户的重要桥梁,其效能与体验直接关系到企业的品牌形象与市场口碑。近日,权威机构发布的《中国智能客服市场竞争力报告》显示,点控云智能客…

9.5 IO-线程day5

信号量打印ABC#include <stdio.h> #include <string.h> #include <stdlib.h> #include <25061head.h> sem_t sem[1]; void *callback(void *arg) {while(1){sem_wait(&sem[0]);printf("A\n");sleep(1);sem_post(&sem[1]);}pthread_e…

老师如何高效收集学生学籍信息,完成收集工作?

开学的时光总是忙碌而充实&#xff0c;除了要热情地迎接新生、用心地备课&#xff0c;还有一件让人头疼不已的事情——学生学籍信息的收集。上学期开学&#xff0c;我承担起了收集班级新生信息的重任&#xff0c;满心以为提前准备好的纸质表格&#xff0c;在新生报到那天发给家…

JAVA层的权限与SELinux的关系

Java 层权限是应用程序级别的“门禁卡”&#xff0c;而 SELinux 是系统级别的“防火墙规则和强制访问控制”。即使你拥有进入大楼的“门禁卡”&#xff08;Java 权限&#xff09;&#xff0c;如果“防火墙规则”&#xff08;SELinux 策略&#xff09;不允许你的进程与目标服务或…

Screen 三步上手

好的&#xff0c;这是给同事的简洁版说明&#xff1a;Screen 三步上手 开新窗口&#xff1a;干活前先开个带名字的窗口&#xff0c;不怕断连。 screen -S 任务名看所有窗口&#xff1a;随时查看都有哪些任务在后台跑。 screen -ls重回窗口&#xff1a;断连后重新登录&#xff0…

flink 伪代码

import java.util.*; import java.util.concurrent.*;// 核心接口定义 interface StreamOperator {void open();void processElement(Object element);void close(); }interface SourceFunction extends StreamOperator {void run(SourceContext ctx); }interface SinkFunction…

一招快速识别你的电脑是机械硬盘还是固态硬盘

你是否经常觉得电脑开机慢、软件打开卡顿&#xff1f;其中一个关键原因&#xff0c;可能就在于你使用的是机械硬盘&#xff08;HDD&#xff09;还是固态硬盘&#xff08;SSD&#xff09;。固态硬盘读写速度快&#xff0c;能显著提升系统响应速度&#xff1b;而机械硬盘虽然容量…

52核心52线程,Intel下一代CPU憋了个大的

被逼急了的 Intel&#xff0c;可能正在憋大招&#xff01;如大伙儿所见&#xff0c;Intel 这两年日子已经不能用「惨」来形容。其过去引以为傲的 PC 处理器&#xff0c;特别是高性能桌面处理器领域&#xff0c;如今算是彻底被 AMD 打懵了。无他&#xff0c;己方产品是连年摆烂&…

【LeetCode 热题 100】1. 两数之和——(解法二)哈希表

Problem: 1. 两数之和 文章目录整体思路完整代码时空复杂度时间复杂度&#xff1a;O(N)空间复杂度&#xff1a;O(N)整体思路 这段代码旨在高效地解决 “两数之和” 问题。与 O(N^2) 的暴力枚举法相比&#xff0c;此版本采用了一种经典的 “空间换时间” 策略&#xff0c;利用 …

MySQL主从同步--主从复制进阶

MySQL支持一台主库同时向多台从库进行复制&#xff0c;从库同时也可以作为其他从服务器的主库&#xff0c;实现链状复制。1、MySQL支持的binlog二进制日志复制类型- 基于语句&#xff08;statement&#xff09;的复制在主服务器上执行SQL语句&#xff0c;在从服务器上执行同样的…

WPF外部打开html文件

注意&#xff1a;这是一份提供WPF外部浏览器打开html的方法&#xff0c;而不是WPF内部嵌入html 需要通过浏览器打开&#xff0c;否则无法使用地址栏拼接参数的形式操作html 下面是打开html的方法↓string localHtmlPath "C:\Users\pangb\Downloads\Help\帮助文档 - 副本.…

Go初级之十:错误处理与程序健壮性

Go初级之十&#xff1a;错误处理与程序健壮性为什么选这个主题&#xff1f; 错误处理是 Go 语言中一个非常独特且重要的设计哲学。它体现了 Go 的“显式错误处理”思想&#xff0c;与其它语言&#xff08;如 Java/Python&#xff09;的异常机制不同。在实际开发中&#xff0c;几…

Xsens解码人形机器人训练的语言

随着人形机器人在现实世界的应用中变得越来越普遍&#xff0c;了解实现其类似人类运动的技术至关重要。在Xsens我们满怀热情地探索这一领域&#xff0c;致力于为人形机器人训练开发最佳的动作捕捉解决方案。为了帮助您更好地理解所遇到的术语&#xff0c;我们创建了一份概述&am…

25年下载chromedriver.140

前提&#xff1a; 因为我需要用seleium模拟浏览器获取数据&#xff0c;需要用到这个chromedriver 驱动。 1.chrome浏览器版本号 先检查你的chrome 的版本号是多少&#xff0c;就下载对应的 chromedriver 【三个点】--->【帮助】------>【关于 Google chrome 】 我的版本…

深度学习玩游戏, 模型玩游戏,大模型+游戏 llm+game, 机器学习玩游戏,人工智能游戏陪伴,模型陪玩游戏

1. 论文地址 Think in Games: Learning to Reason in Games via Reinforcement Learning with Large Language Models 2. 中文&#xff1a; Think in Games&#xff1a;做一个在王者荣耀中会玩和思考的Agent 3. 我记得几年前&#xff0c;相关文章还是使用dqn算法。玩雅利达小…

并查集|栈

lc1668不能直接跳class Solution { public:int maxRepeating(string sequence, string word) {int k 0, n sequence.size(), wn word.size(), t 0;for (int i 0; i < n - wn; i) {if (sequence.substr(i, wn) word) {t 1;int j i wn;while (j wn < n &&…

问题三ai思路

好的&#xff0c;我把“路线A&#xff1a;分类建模择时”的代码按功能分段给出&#xff0c;并为每段配上简明解释。你可以将这些段落依次粘贴到已完成清洗后的 df 变量之后直接运行。 0. 依赖导入&#xff08;一次即可&#xff09; 作用&#xff1a;导入所需库&#xff1b;后续…

Java第十四幕集合啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦

集合1 Collection接口1.1 集合概述集合是一个装对象的容器。集合中只能存放引用数据类型的对象。集合中有一些大小是固定的&#xff0c;有一些是不固定的。有一些是有序的&#xff0c;有些是无序的。有些可以有重复元素&#xff0c;有一些不可以有重复元素1.2 集合常用方法publ…

硬件基础:串口通信

数据传输方式&#xff08;按位传输方式&#xff09;并行通信通过多条数据线同时传输多个数据位&#xff0c;速度较快但成本高&#xff0c;抗干扰能力弱&#xff0c;适用于短距离通信&#xff0c;如早期的打印机接口。串行通信通过单条或少数数据线逐位传输数据&#xff0c;线路…

从Java全栈到云原生:一场技术深度对话

从Java全栈到云原生&#xff1a;一场技术深度对话 面试官与应聘者互动记录 面试官&#xff1a;你好&#xff0c;欢迎来到我们的面试。先简单介绍一下你自己吧。 应聘者&#xff1a;您好&#xff0c;我叫李明&#xff0c;28岁&#xff0c;硕士学历&#xff0c;有5年Java全栈开发…