📋 目录

🚀 RocketMQ简介

什么是RocketMQ?

核心概念

🏗️ 基础架构组件

📝 重要概念解释

🔧 环境搭建

1. RocketMQ服务端安装

Docker方式(推荐初学者)

手动安装方式

2. 验证安装

🏗️ Spring Boot集成配置

1. 添加依赖

2. 配置文件

application.yml

application.properties(可选)

📨 基础消息收发

1. 简单消息发送

创建生产者服务

创建消息实体类

2. 简单消息消费

创建消费者

3. 测试控制器

创建测试API

🎯 消息类型详解

1. 同步消息

特点

实现代码

2. 异步消息

特点

实现代码

3. 单向消息

特点

实现代码

4. 延时消息

特点

延时级别说明

实现代码

🚀 高级特性

1. 顺序消息

特点和使用场景

实现代码

2. 事务消息

特点和使用场景

实现代码

3. 消息过滤

使用SQL过滤

📊 监控和管理

1. 集成监控

添加监控依赖

监控配置

自定义健康检查

2. 消息追踪

消息追踪服务

🎯 实战案例

1. 电商订单系统

订单流程设计

2. 用户行为分析系统

用户行为追踪

💡 最佳实践

1. 消息设计原则

消息格式设计

消息发送封装

2. 错误处理和重试

消费重试配置

死信队列处理

3. 性能优化

生产者优化配置

消费者性能配置

❓ 常见问题解决

1. 连接问题

无法连接到NameServer

消息发送失败

2. 消息消费问题

消息重复消费

3. 性能问题

消费速度慢

4. 监控告警

消息积压监控

📚 总结

RocketMQ的优势

适用场景

学习路径建议


🚀 RocketMQ简介

什么是RocketMQ?

RocketMQ是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高实时、分布式的特点。

核心概念

🏗️ 基础架构组件
Producer(生产者):负责发送消息
Consumer(消费者):负责接收和处理消息
NameServer:路由信息的注册中心
Broker:消息存储和转发的核心组件
📝 重要概念解释
  • Topic(主题):消息的逻辑分类,类似于"频道"
  • Tag(标签):消息的二级分类,用于更细粒度的过滤
  • Message Queue(消息队列):Topic下的物理存储单元
  • Producer Group(生产者组):同一类生产者的集合
  • Consumer Group(消费者组):同一类消费者的集合

🔧 环境搭建

1. RocketMQ服务端安装

Docker方式(推荐初学者)
# 1. 拉取RocketMQ镜像
docker pull apache/rocketmq:4.9.4# 2. 启动NameServer
docker run -d \--name rmqnamesrv \-p 9876:9876 \apache/rocketmq:4.9.4 \sh mqnamesrv# 3. 启动Broker
docker run -d \--name rmqbroker \--link rmqnamesrv:namesrv \-p 10911:10911 \-p 10909:10909 \-e "NAMESRV_ADDR=namesrv:9876" \apache/rocketmq:4.9.4 \sh mqbroker -c /opt/rocketmq-4.9.4/conf/broker.conf
手动安装方式
# 1. 下载RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip# 2. 解压
unzip rocketmq-all-4.9.4-bin-release.zip# 3. 设置环境变量
export NAMESRV_ADDR=localhost:9876# 4. 启动NameServer
cd rocketmq-all-4.9.4-bin-release
nohup sh bin/mqnamesrv &# 5. 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &

2. 验证安装

# 检查进程
jps | grep -E "(NamesrvStartup|BrokerStartup)"# 测试发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer# 测试接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

🏗️ Spring Boot集成配置

1. 添加依赖

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- JSON处理 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
</dependencies>

2. 配置文件

application.yml
# RocketMQ配置
rocketmq:name-server: 127.0.0.1:9876  # NameServer地址producer:group: producer-group        # 生产者组名send-message-timeout: 3000   # 发送消息超时时间(毫秒)retry-times-when-send-failed: 2  # 发送失败重试次数max-message-size: 4194304    # 最大消息大小(4MB)consumer:group: consumer-group        # 消费者组名# 应用配置
spring:application:name: rocketmq-demo
server:port: 8080# 日志配置
logging:level:org.apache.rocketmq: INFOcom.example: DEBUG
application.properties(可选)
# RocketMQ配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer-group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.consumer.group=consumer-group

📨 基础消息收发

1. 简单消息发送

创建生产者服务
package com.example.producer;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class SimpleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送简单文本消息*/public void sendSimpleMessage(String topic, String message) {try {// 最简单的发送方式rocketMQTemplate.convertAndSend(topic, message);System.out.println("✅ 消息发送成功: " + message);} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}/*** 发送带标签的消息*/public void sendTaggedMessage(String topic, String tag, String message) {try {String destination = topic + ":" + tag;rocketMQTemplate.convertAndSend(destination, message);System.out.println("✅ 带标签消息发送成功: " + message);} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}/*** 发送对象消息*/public void sendObjectMessage(String topic, Object obj) {try {rocketMQTemplate.convertAndSend(topic, obj);System.out.println("✅ 对象消息发送成功: " + obj.toString());} catch (Exception e) {System.err.println("❌ 消息发送失败: " + e.getMessage());}}
}
创建消息实体类
package com.example.model;import java.io.Serializable;
import java.time.LocalDateTime;public class UserMessage implements Serializable {private Long userId;private String username;private String action;private LocalDateTime timestamp;// 构造函数public UserMessage() {}public UserMessage(Long userId, String username, String action) {this.userId = userId;this.username = username;this.action = action;this.timestamp = LocalDateTime.now();}// Getter和Setter方法public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getAction() { return action; }public void setAction(String action) { this.action = action; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "UserMessage{" +"userId=" + userId +", username='" + username + '\'' +", action='" + action + '\'' +", timestamp=" + timestamp +'}';}
}

2. 简单消息消费

创建消费者
package com.example.consumer;import com.example.model.UserMessage;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 简单消息消费者*/
@Component
@RocketMQMessageListener(topic = "simple-topic",           // 监听的主题consumerGroup = "simple-consumer-group"  // 消费者组
)
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("📨 接收到简单消息: " + message);// 处理业务逻辑try {processMessage(message);System.out.println("✅ 消息处理成功");} catch (Exception e) {System.err.println("❌ 消息处理失败: " + e.getMessage());// 注意:如果处理失败,消息会重新投递}}private void processMessage(String message) {// 模拟业务处理System.out.println("🔄 正在处理消息: " + message);// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}/*** 带标签的消息消费者*/
@Component
@RocketMQMessageListener(topic = "tagged-topic",consumerGroup = "tagged-consumer-group",selectorExpression = "user-action"  // 只消费特定标签的消息
)
public class TaggedConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("📨 接收到用户行为消息: " + message);// 处理用户行为相关的业务逻辑}
}/*** 对象消息消费者*/
@Component
@RocketMQMessageListener(topic = "user-topic",consumerGroup = "user-consumer-group"
)
public class UserMessageConsumer implements RocketMQListener<UserMessage> {@Overridepublic void onMessage(UserMessage userMessage) {System.out.println("📨 接收到用户消息: " + userMessage);// 根据不同的用户行为进行处理switch (userMessage.getAction()) {case "login":handleUserLogin(userMessage);break;case "logout":handleUserLogout(userMessage);break;case "purchase":handleUserPurchase(userMessage);break;default:System.out.println("🤔 未知的用户行为: " + userMessage.getAction());}}private void handleUserLogin(UserMessage message) {System.out.println("👤 用户登录: " + message.getUsername());// 处理用户登录逻辑}private void handleUserLogout(UserMessage message) {System.out.println("👋 用户登出: " + message.getUsername());// 处理用户登出逻辑}private void handleUserPurchase(UserMessage message) {System.out.println("💰 用户购买: " + message.getUsername());// 处理用户购买逻辑}
}

3. 测试控制器

创建测试API
package com.example.controller;import com.example.model.UserMessage;
import com.example.producer.SimpleProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/api/message")
public class MessageController {@Autowiredprivate SimpleProducer simpleProducer;/*** 发送简单文本消息*/@PostMapping("/simple")public String sendSimpleMessage(@RequestParam String message) {simpleProducer.sendSimpleMessage("simple-topic", message);return "✅ 简单消息发送成功: " + message;}/*** 发送带标签的消息*/@PostMapping("/tagged")public String sendTaggedMessage(@RequestParam String message, @RequestParam String tag) {simpleProducer.sendTaggedMessage("tagged-topic", tag, message);return "✅ 带标签消息发送成功: " + message + " (tag: " + tag + ")";}/*** 发送用户消息*/@PostMapping("/user")public String sendUserMessage(@RequestParam Long userId, @RequestParam String username,@RequestParam String action) {UserMessage userMessage = new UserMessage(userId, username, action);simpleProducer.sendObjectMessage("user-topic", userMessage);return "✅ 用户消息发送成功: " + userMessage.toString();}/*** 批量发送消息*/@PostMapping("/batch")public String sendBatchMessages(@RequestParam int count) {for (int i = 1; i <= count; i++) {String message = "批量消息 #" + i;simpleProducer.sendSimpleMessage("simple-topic", message);}return "✅ 批量发送 " + count + " 条消息成功";}
}

🎯 消息类型详解

1. 同步消息

特点
  • 发送方等待消息发送结果
  • 可靠性高,适合重要业务
  • 性能相对较低
实现代码
@Service
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息*/public void sendSyncMessage(String topic, String message) {try {// 同步发送,会等待结果SendResult sendResult = rocketMQTemplate.syncSend(topic, message);System.out.println("📤 同步消息发送结果:");System.out.println("   消息ID: " + sendResult.getMsgId());System.out.println("   发送状态: " + sendResult.getSendStatus());System.out.println("   消息队列: " + sendResult.getMessageQueue());System.out.println("   队列偏移量: " + sendResult.getQueueOffset());} catch (Exception e) {System.err.println("❌ 同步消息发送失败: " + e.getMessage());throw new RuntimeException("消息发送失败", e);}}/*** 发送同步消息(带超时时间)*/public void sendSyncMessageWithTimeout(String topic, String message, long timeout) {try {SendResult sendResult = rocketMQTemplate.syncSend(topic, message, timeout);System.out.println("✅ 同步消息发送成功: " + sendResult.getMsgId());} catch (Exceptio

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/88044.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88044.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/88044.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java+Springboot的医院档案管理系统

源码编号&#xff1a;S597源码名称&#xff1a;基于Springboot的医院档案管理系统用户类型&#xff1a;多角色&#xff0c;用户、医护人员、管理员数据库表数量&#xff1a;11 张表主要技术&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven运行环境&#xff1a;Windows/M…

Pandas 学习教程

目录 定义 基本操作 一维数组操作 二维数组操作 数据选择过滤 数据处理 数据清洗 数据转换 数据分析 排序 分组聚合 数据透视表 高级操作 合并数据 时间序列处理 自定义函数调用 数据可视化集成 数据导出和导入 大数据分块处理 定义 全称&#xff1a; panel da…

QueryWrapper 类的作用与示例详解

通俗易懂的解释想象一下你去图书馆找书&#xff1a;QueryWrapper 就像是一个智能的图书管理员你告诉管理员你的需求&#xff1a;"我要找计算机类、2020年后出版的、作者是张三的书"管理员会根据你的要求组合查询条件&#xff0c;然后去书库帮你找书在编程中&#xff…

【PyTorch】PyTorch中torch.nn模块的循环层

PyTorch深度学习总结 第九章 PyTorch中torch.nn模块的循环层 文章目录PyTorch深度学习总结前言一、循环层1. 简单循环层&#xff08;RNN&#xff09;2. 长短期记忆网络&#xff08;LSTM&#xff09;3. 门控循环单元&#xff08;GRU&#xff09;4. 双向循环层二、循环层参数1. …

Ubuntu 24.04 LTS 服务器配置:安装 JDK、Nginx、Redis。

Ubuntu 24.04 LTS 服务器配置&#xff1a;安装 JDK、Nginx、Redis。新建用来放置软件安装包的目录 mkdir /home/software 配置目录所有者为 ubuntu 用户&#xff1a; chown ubuntu /home/software将软件安装包上传到 /home/software配置 JDK-8 新建 jdk 安装目录 mkdir /usr/ja…

工作中用到过哪些设计模式?是怎么实现的?

1. 单例模式&#xff08;结合 Spring Component&#xff09;场景&#xff1a;配置中心、全局状态管理 Spring 实现&#xff1a;java// 自动注册为Spring Bean&#xff08;默认单例&#xff09; Component public class AppConfig {Value("${server.port}")private in…

Leetcode 3609. Minimum Moves to Reach Target in Grid

Leetcode 3609. Minimum Moves to Reach Target in Grid 1. 解题思路2. 代码实现 题目链接&#xff1a;3609. Minimum Moves to Reach Target in Grid 1. 解题思路 这一题我一开始走岔了&#xff0c;走了一个正向遍历走法的思路&#xff0c;无论怎么剪枝都一直超时。后来看了…

工作流引擎:IDEA没有actiBPMN插件怎么办?

文章目录一、问题描述二、替代方案一、问题描述 我们在学习activiti7工作流引擎的时候&#xff0c;需要设计流程图。 一般推荐的就是使用IDEA插件actiBPMN进行开发。 但是&#xff0c;这个插件在IDEA2019后的版本都不在支持。 也就是搜不到 那么&#xff0c;怎么办了&#x…

Android音视频探索之旅 | CMake基础语法 创建支持Ffmpeg的Android项目

一.CMake语法 CMake语法非常多&#xff0c;我们知道如何导入静态库和动态库以及最基础的使用&#xff0c;目前是够用的。其它方面则根据实际项目同步学习。 1.1.基础语法-常用 cmake_minimum_required&#xff1a;指定cmake最小版本include_directories&#xff1a;引入&#x…

React Native 初始化项目和模拟器运行

中文官方文档&#xff1a;https://reactnative.cn/docs/environment-setup 英文官方文档&#xff1a;https://reactnative.dev/docs/getting-started-without-a-framework#step-1-creating-a-new-application 创建新项目 1、初始化 # 如果你之前全局安装过旧的react-native-cli…

20250706-5-Docker 快速入门(上)-创建容器常用选项_笔记

一、创建容器常用选项&#xfeff;&#xfeff;1. 创建容器常用选项&#xfeff;1&#xff09;常用选项创建容器常用选项&#xfeff;交互式选项&#xff1a;-i&#xff1a;保持标准输入打开&#xff0c;允许交互式操作-t&#xff1a;分配伪终端&#xff0c;使容器像传统终端一…

插值与拟合(3):B样条曲线

在路径规划问题中&#xff0c;通常会用到B样条来平滑路径&#xff0c;本文实现并封装了三次准均匀开放B样条曲线&#xff0c;供大学学习使用。作者提供了三套代码方案。可以用于不同平台&#xff1a;方案1&#xff1a;MATLAB&#xff1b;方案2&#xff1a;标准C&#xff1b;方案…

[免费]基于Python豆瓣电影数据分析及可视化系统(Flask+echarts+pandas)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的于Python豆瓣电影数据分析及可视化系统(Flaskechartpandas)【论文源码SQL脚本】&#xff0c;分享下哈。项目介绍随着如今电影越来越多&#xff0c;各种各样的烂片和捞钱的商业片也层出不穷&#xff0c;而有意…

SQL127 月总刷题数和日均刷题数

SQL127 月总刷题数和日均刷题数 withtemp as (selectDATE_FORMAT(submit_time, "%Y%m") as submit_month,count(question_id) as month_q_cnt,round(count(question_id) / day(last_day(max(submit_time))),3) as avg_day_q_cntfrompractice_recordwhereyear(submit…

unity luban接入

1.找到luban官网并下载他的例子和.net8.0的sdk安装 官网地址如下 快速上手 | Luban 参考大佬教程如下 Luban新版本接入教程_哔哩哔哩_bilibili 2.找到他的luban_examples-main示例下的两个文件MiniTemplate和tool 3.MiniTemplate这个文件复制一份到项目工程下&#xff0c;自…

Django服务开发镜像构建

最后完整的项目目录结构1、安装依赖pip install django django-tables2 django-filter2、创建项目和主应用django-admin startproject configcd configpython manage.py startapp dynamic_models3、配置settings.py将项目模块dynamic_models加入进来&#xff0c;django_tables2…

20250706-3-Docker 快速入门(上)-常用镜像管理命令_笔记

一、配置加速器&#xfeff;1. Docker Hub简介与地址&#xfeff;公共镜像仓库: 由Docker公司维护的公共镜像仓库&#xff0c;包含大量容器镜像默认下载源: Docker工具默认从这个公共镜像库下载镜像访问地址: https://hub.docker.com镜像搜索功能: 可通过浏览器访问图形化管理系…

【unity游戏开发——优化篇】使用Occlusion Culling遮挡剔除,只渲染相机视野内的游戏物体提升游戏性能

注意&#xff1a;考虑到优化的内容比较多&#xff0c;我将该内容分开&#xff0c;并全部整合放在【unity游戏开发——优化篇】专栏里&#xff0c;感兴趣的小伙伴可以前往逐一查看学习。 文章目录 前言实战1、确保所有静止的3D物体都标记为Occluder Static静态遮挡体和Occludee …

通用业务编号生成工具类(MyBatis-Plus + Spring Boot)详解 + 3种调用方式

在企业应用开发中&#xff0c;我们经常需要生成类似 BZ -240704-0001 这种“业务编号”&#xff0c;它通常具有以下特点&#xff1a;前缀&#xff1a;代表业务类型&#xff0c;如 BZ 表示包装日期&#xff1a;年月日格式&#xff0c;通常为 yyMMdd序列号&#xff1a;当天内递增…

前端相关性能优化笔记

1.打开速度怎么变快 - 首屏加载优化2.再次打开速度怎么变快 - 缓存优化了3.操作怎么才顺滑 - 渲染优化4.动画怎么保证流畅 - 长任务拆分2.1 首屏加载指标细化:1.FP(First Paint 首次绘制) 2.FCP(First contentful Paint 首次内容绘制)&#xff0c;FP 到 FCP 中间其实主要是 SPA…