📋 目录
🚀 RocketMQ简介
什么是RocketMQ?
核心概念
🏗️ 基础架构组件
📝 重要概念解释
🔧 环境搭建
1. RocketMQ服务端安装
Docker方式(推荐初学者)
手动安装方式
2. 验证安装
🏗️ Spring Boot集成配置
1. 添加依赖
2. 配置文件
application.yml
application.properties(可选)
📨 基础消息收发
1. 简单消息发送
创建生产者服务
创建消息实体类
2. 简单消息消费
创建消费者
3. 测试控制器
创建测试API
🎯 消息类型详解
1. 同步消息
特点
实现代码
2. 异步消息
特点
实现代码
3. 单向消息
特点
实现代码
4. 延时消息
特点
延时级别说明
实现代码
🚀 高级特性
1. 顺序消息
特点和使用场景
实现代码
2. 事务消息
特点和使用场景
实现代码
3. 消息过滤
使用SQL过滤
📊 监控和管理
1. 集成监控
添加监控依赖
监控配置
自定义健康检查
2. 消息追踪
消息追踪服务
🎯 实战案例
1. 电商订单系统
订单流程设计
2. 用户行为分析系统
用户行为追踪
💡 最佳实践
1. 消息设计原则
消息格式设计
消息发送封装
2. 错误处理和重试
消费重试配置
死信队列处理
3. 性能优化
生产者优化配置
消费者性能配置
❓ 常见问题解决
1. 连接问题
无法连接到NameServer
消息发送失败
2. 消息消费问题
消息重复消费
3. 性能问题
消费速度慢
4. 监控告警
消息积压监控
📚 总结
RocketMQ的优势
适用场景
学习路径建议
🚀 RocketMQ简介
什么是RocketMQ?
RocketMQ是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高实时、分布式的特点。
核心概念
🏗️ 基础架构组件
Producer(生产者):负责发送消息
Consumer(消费者):负责接收和处理消息
NameServer:路由信息的注册中心
Broker:消息存储和转发的核心组件
📝 重要概念解释
- Topic(主题):消息的逻辑分类,类似于"频道"
- Tag(标签):消息的二级分类,用于更细粒度的过滤
- Message Queue(消息队列):Topic下的物理存储单元
- Producer Group(生产者组):同一类生产者的集合
- Consumer Group(消费者组):同一类消费者的集合
🔧 环境搭建
1. RocketMQ服务端安装
Docker方式(推荐初学者)
# 1. 拉取RocketMQ镜像
docker pull apache/rocketmq:4.9.4# 2. 启动NameServer
docker run -d \--name rmqnamesrv \-p 9876:9876 \apache/rocketmq:4.9.4 \sh mqnamesrv# 3. 启动Broker
docker run -d \--name rmqbroker \--link rmqnamesrv:namesrv \-p 10911:10911 \-p 10909:10909 \-e "NAMESRV_ADDR=namesrv:9876" \apache/rocketmq:4.9.4 \sh mqbroker -c /opt/rocketmq-4.9.4/conf/broker.conf
手动安装方式
# 1. 下载RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip# 2. 解压
unzip rocketmq-all-4.9.4-bin-release.zip# 3. 设置环境变量
export NAMESRV_ADDR=localhost:9876# 4. 启动NameServer
cd rocketmq-all-4.9.4-bin-release
nohup sh bin/mqnamesrv &# 5. 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
2. 验证安装
# 检查进程
jps | grep -E "(NamesrvStartup|BrokerStartup)"# 测试发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer# 测试接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
🏗️ Spring Boot集成配置
1. 添加依赖
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- JSON处理 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
</dependencies>
2. 配置文件
application.yml
# RocketMQ配置
rocketmq:name-server: 127.0.0.1:9876 # NameServer地址producer:group: producer-group # 生产者组名send-message-timeout: 3000 # 发送消息超时时间(毫秒)retry-times-when-send-failed: 2 # 发送失败重试次数max-message-size: 4194304 # 最大消息大小(4MB)consumer:group: consumer-group # 消费者组名# 应用配置
spring:application:name: rocketmq-demo
server:port: 8080# 日志配置
logging:level:org.apache.rocketmq: INFOcom.example: DEBUG
application.properties(可选)
# RocketMQ配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer-group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.consumer.group=consumer-group
📨 基础消息收发
1. 简单消息发送
创建生产者服务
package com.example.producer;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class SimpleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送简单文本消息*/public void sendSimpleMessage(String topic, String message) {try {// 最简单的发送方式rocketMQTemplate.convertAndSend(topic, message);System.out.println("✅ 消息发送成功: " + message);} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}/*** 发送带标签的消息*/public void sendTaggedMessage(String topic, String tag, String message) {try {String destination = topic + ":" + tag;rocketMQTemplate.convertAndSend(destination, message);System.out.println("✅ 带标签消息发送成功: " + message);} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}/*** 发送对象消息*/public void sendObjectMessage(String topic, Object obj) {try {rocketMQTemplate.convertAndSend(topic, obj);System.out.println("✅ 对象消息发送成功: " + obj.toString());} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}
}
创建消息实体类
package com.example.model;import java.io.Serializable;
import java.time.LocalDateTime;public class UserMessage implements Serializable {private Long userId;private String username;private String action;private LocalDateTime timestamp;// 构造函数public UserMessage() {}public UserMessage(Long userId, String username, String action) {this.userId = userId;this.username = username;this.action = action;this.timestamp = LocalDateTime.now();}// Getter和Setter方法public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getAction() { return action; }public void setAction(String action) { this.action = action; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "UserMessage{" +"userId=" + userId +", username='" + username + '\'' +", action='" + action + '\'' +", timestamp=" + timestamp +'}';}
}
2. 简单消息消费
创建消费者
package com.example.consumer;import com.example.model.UserMessage;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 简单消息消费者*/
@Component
@RocketMQMessageListener(topic = "simple-topic", // 监听的主题consumerGroup = "simple-consumer-group" // 消费者组
)
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("📨 接收到简单消息: " + message);// 处理业务逻辑try {processMessage(message);System.out.println("✅ 消息处理成功");} catch (Exception e) {System.err.println("❌ 消息处理失败: " + e.getMessage());// 注意:如果处理失败,消息会重新投递}}private void processMessage(String message) {// 模拟业务处理System.out.println("🔄 正在处理消息: " + message);// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}/*** 带标签的消息消费者*/
@Component
@RocketMQMessageListener(topic = "tagged-topic",consumerGroup = "tagged-consumer-group",selectorExpression = "user-action" // 只消费特定标签的消息
)
public class TaggedConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("📨 接收到用户行为消息: " + message);// 处理用户行为相关的业务逻辑}
}/*** 对象消息消费者*/
@Component
@RocketMQMessageListener(topic = "user-topic",consumerGroup = "user-consumer-group"
)
public class UserMessageConsumer implements RocketMQListener<UserMessage> {@Overridepublic void onMessage(UserMessage userMessage) {System.out.println("📨 接收到用户消息: " + userMessage);// 根据不同的用户行为进行处理switch (userMessage.getAction()) {case "login":handleUserLogin(userMessage);break;case "logout":handleUserLogout(userMessage);break;case "purchase":handleUserPurchase(userMessage);break;default:System.out.println("🤔 未知的用户行为: " + userMessage.getAction());}}private void handleUserLogin(UserMessage message) {System.out.println("👤 用户登录: " + message.getUsername());// 处理用户登录逻辑}private void handleUserLogout(UserMessage message) {System.out.println("👋 用户登出: " + message.getUsername());// 处理用户登出逻辑}private void handleUserPurchase(UserMessage message) {System.out.println("💰 用户购买: " + message.getUsername());// 处理用户购买逻辑}
}
3. 测试控制器
创建测试API
package com.example.controller;import com.example.model.UserMessage;
import com.example.producer.SimpleProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/api/message")
public class MessageController {@Autowiredprivate SimpleProducer simpleProducer;/*** 发送简单文本消息*/@PostMapping("/simple")public String sendSimpleMessage(@RequestParam String message) {simpleProducer.sendSimpleMessage("simple-topic", message);return "✅ 简单消息发送成功: " + message;}/*** 发送带标签的消息*/@PostMapping("/tagged")public String sendTaggedMessage(@RequestParam String message, @RequestParam String tag) {simpleProducer.sendTaggedMessage("tagged-topic", tag, message);return "✅ 带标签消息发送成功: " + message + " (tag: " + tag + ")";}/*** 发送用户消息*/@PostMapping("/user")public String sendUserMessage(@RequestParam Long userId, @RequestParam String username,@RequestParam String action) {UserMessage userMessage = new UserMessage(userId, username, action);simpleProducer.sendObjectMessage("user-topic", userMessage);return "✅ 用户消息发送成功: " + userMessage.toString();}/*** 批量发送消息*/@PostMapping("/batch")public String sendBatchMessages(@RequestParam int count) {for (int i = 1; i <= count; i++) {String message = "批量消息 #" + i;simpleProducer.sendSimpleMessage("simple-topic", message);}return "✅ 批量发送 " + count + " 条消息成功";}
}
🎯 消息类型详解
1. 同步消息
特点
- 发送方等待消息发送结果
- 可靠性高,适合重要业务
- 性能相对较低
实现代码
@Service
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息*/public void sendSyncMessage(String topic, String message) {try {// 同步发送,会等待结果SendResult sendResult = rocketMQTemplate.syncSend(topic, message);System.out.println("📤 同步消息发送结果:");System.out.println(" 消息ID: " + sendResult.getMsgId());System.out.println(" 发送状态: " + sendResult.getSendStatus());System.out.println(" 消息队列: " + sendResult.getMessageQueue());System.out.println(" 队列偏移量: " + sendResult.getQueueOffset());} catch (Exception e) {System.err.println("❌ 同步消息发送失败: " + e.getMessage());throw new RuntimeException("消息发送失败", e);}}/*** 发送同步消息(带超时时间)*/public void sendSyncMessageWithTimeout(String topic, String message, long timeout) {try {SendResult sendResult = rocketMQTemplate.syncSend(topic, message, timeout);System.out.println("✅ 同步消息发送成功: " + sendResult.getMsgId());} catch (Exceptio