RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息中间件,主要用于实现分布式系统中的消息传递,支持异步通信、系统解耦、流量削峰等场景。在 Java 生态中,RabbitMQ 被广泛应用,其 Java 客户端提供了简洁的 API,方便开发者快速集成。
AMQP 协议
核心概念
1. 消息模型
- 生产者(Producer):发送消息的应用
- 消费者(Consumer):接收消息的应用
- 消息中间件(Broker):负责接收、存储和转发消息
2. 核心组件
AMQP(Advanced Message Queuing Protocol)是一种开放标准的应用层协议,专为消息队列设计。它定义了客户端与消息中间件之间的通信规范,确保不同厂商的实现可以互操作。
+----------+ +---------+ +----------+
| Producer | -> | Exchange| -> | Queue | -> Consumer
+----------+ +---------+ +----------+|v+---------+| Binding |+---------+
- Exchange(交换器)
接收生产者的消息
根据规则(Binding)将消息路由到队列
类型包括:Direct、Topic、Fanout、Headers
- Queue(队列)
存储消息直到被消费
支持多个消费者竞争消费
消息可持久化存储
- Binding(绑定)
定义 Exchange 与 Queue 之间的关联
通过 Binding Key(绑定键)和 Routing Key(路由键)匹配
工作流程
1.生产者发送消息
指定消息的 Routing Key
将消息发送到特定的 Exchange
2.Exchange 路由逻辑
Direct Exchange:按 Routing Key 精确匹配
Topic Exchange:按 Routing Key 的模式匹配(支持*和#通配符)
Fanout Exchange:将消息广播到所有绑定的队列
Headers Exchange:按消息头部属性匹配
3.消费者接收消息
从队列中拉取或订阅消息
处理完成后发送确认(ACK)
RabbitMQ 核心概念
组件 | 作用 |
生产者(Producer) | 消息的发送方,负责创建并发送消息到 RabbitMQ 服务器。 |
消费者(Consumer) | 消息的接收方,监听队列并处理接收到的消息。 |
队列(Queue) | 消息的存储容器,位于 RabbitMQ 服务器中,消息最终会被投递到队列中等待消费。 |
交换机(Exchange) | 接收生产者发送的消息,并根据绑定规则(Binding)将消息路由到对应的队列。 |
绑定(Binding) | 定义交换机与队列之间的关联关系,包含路由键(Routing Key)和匹配规则。 |
路由键(Routing Key) | 生产者发送消息时指定的键,交换机根据该键和绑定规则路由消息。 |
RabbitMQ 消息流转流程
- 生产者发送消息时,需指定交换机名称和路由键;
- 交换机根据自身类型(如 Direct、Topic 等)和绑定规则,将消息转发到匹配的队列;
- 消费者监听队列,获取并处理消息。
Java 操作 RabbitMQ 基础示例
1. 连接 RabbitMQ 服务器
所有操作的前提是建立与 RabbitMQ 的连接,需指定服务器地址、端口、账号密码(默认账号guest仅允许本地连接,远程连接需配置新用户)。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {// RabbitMQ连接配置private static final String HOST = "localhost"; // 服务器地址private static final int PORT = 5672; // 默认端口private static final String USERNAME = "guest";private static final String PASSWORD = "guest";// 获取连接public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory.newConnection();}
}
2. 生产者发送消息
- 创建连接和通道(Channel);
- 声明交换机(可选,若使用默认交换机则无需声明);
- 声明队列(指定队列名称、是否持久化等);
- 绑定交换机与队列(若使用自定义交换机);
- 发送消息(指定交换机、路由键、消息内容)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 队列名称(需与消费者一致)private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道(RabbitMQ的操作大多通过通道完成)Channel channel = connection.createChannel();// 3. 声明队列(参数:队列名、是否持久化、是否排他、是否自动删除、附加参数)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 消息内容String message = "Hello, RabbitMQ from Java!";// 5. 发送消息(参数:交换机名、路由键、消息属性、消息字节数组)// 此处使用默认交换机(""),路由键需与队列名一致channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生产者发送消息:" + message);// 6. 关闭资源channel.close();connection.close();}
}
3. 消费者接收消息
- 创建连接和通道;
- 声明队列(需与生产者队列名一致);
- 定义消息处理逻辑(通过DefaultConsumer回调);
- 开启消费(指定队列、是否自动确认消息)。
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道Channel channel = connection.createChannel();// 3. 声明队列(需与生产者一致,重复声明不会报错)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("消费者已启动,等待接收消息...");// 4. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者接收消息:" + message);};// 5. 开启消费(参数:队列名、是否自动确认、消息接收回调、取消消费回调)// 自动确认(autoAck=true):消息被接收后自动从队列删除;false则需手动确认channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
Spring AMQP简化 RabbitMQ
1.引入依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置application.yml:
spring:rabbitmq:host: localhostport: 5673username: guestpassword: guest
3. 生产者(使用RabbitTemplate):
@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queue_name", message);
}
4.消费者(使用@RabbitListener注解):
@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("接收消息:" + message);
}
交换机类型及 Java 实现
1. Direct 交换机(精确匹配)
- 路由规则:消息的路由键与绑定的路由键完全一致时,消息被路由到对应队列。
- 适用场景:一对一通信(如订单通知)。
// 生产者声明Direct交换机并绑定队列
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "order.notify";
// 声明Direct交换机(参数:交换机名、类型、是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
// 绑定交换机与队列(参数:队列名、交换机名、路由键)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送消息(指定交换机和路由键)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
2. Topic 交换机(模糊匹配)
- 路由规则:路由键支持通配符(*匹配一个单词,#匹配多个单词,单词以.分隔)。
- 适用场景:多规则匹配(如日志分类:log.error、log.warn)。
// 生产者声明Topic交换机 String EXCHANGE_NAME = "topic_exchange"; // 路由键为"log.error"(匹配绑定键"log.*"或"log.#") String ROUTING_KEY = "log.error"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false); // 绑定队列到交换机,绑定键为"log.#"(匹配所有以log.开头的路由键) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
3. Fanout 交换机(广播)
- 路由规则:忽略路由键,将消息路由到所有绑定的队列。
- 适用场景:一对多通信(如广播通知)。
// 生产者声明Fanout交换机
String EXCHANGE_NAME = "fanout_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 绑定多个队列到交换机(无需指定路由键)
channel.queueBind(QUEUE1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE2, EXCHANGE_NAME, "");
// 发送消息(路由键无效,可设为空)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
RabbitMQ 应用场景
- 异步通信:如用户注册后异步发送邮件 / 短信通知;
- 系统解耦:订单系统与库存系统通过消息通信,避免直接依赖;
- 流量削峰:秒杀场景中,通过队列缓冲请求,避免服务器过载;
- 日志收集:多服务日志通过 Fanout 交换机广播到日志处理服务。