Springboot仿抖音app开发之评论业务模块后端复盘及相关业务知识总结
Springboot仿抖音app开发之粉丝业务模块后端复盘及相关业务知识总结
Springboot仿抖音app开发之用短视频务模块后端复盘及相关业务知识总结
Springboot仿抖音app开发之用户业务模块后端复盘及相关业务知识总结
Springboot仿抖音app开发之消息业务模块后端复盘及相关业务知识总结
为什么需要接口解耦
1. 数据重要性分级处理
在实际业务系统中,数据通常被分为不同重要级别:
重要数据(关键业务数据):
- 用户账户信息、交易记录、订单数据
- 需要强一致性和ACID特性
- 通常存储在关系型数据库(MySQL、PostgreSQL等)
非重要数据(辅助业务数据):
- 用户行为日志、消息通知、统计数据
- 可以容忍最终一致性
- 适合存储在NoSQL数据库(MongoDB、Redis等)
2. 接口解耦的核心优势
故障隔离:
- 重要数据操作失败不影响非重要数据的处理
- MongoDB服务异常不会阻塞核心业务流程
- 提高系统整体可用性
为什么MongoDB服务异常会阻塞核心业务流程
问题场景分析
1. 未解耦的情况(会阻塞核心业务)
问题:
- 如果MongoDB服务异常,整个订单处理流程都会失败
- 核心的订单数据无法保存,影响业务连续性
- 一个非关键功能的故障导致关键业务无法进行
2. 故障隔离的好处
业务连续性保障:
- 核心业务(订单创建)不受MongoDB故障影响
- 用户可以正常下单,不会感知到系统部分组件的异常
系统健壮性提升:
- 不同重要级别的数据采用不同的处理策略
- 非关键功能的故障不会造成系统雪崩
运维友好:
- 可以独立维护和升级MongoDB服务
- MongoDB的性能调优不会影响核心业务
3. 实际案例
假设一个电商系统:
核心流程:用户下单 → 扣减库存 → 生成订单 → 扣款 辅助功能:记录用户行为 → 发送消息通知 → 更新推荐算法数据
如果没有解耦,MongoDB异常会导致:
- 用户无法下单
- 订单系统完全瘫痪
- 收入损失
解耦后的效果:
- 用户正常下单和支付
- 部分通知功能暂时不可用(用户基本无感知)
- 系统稳定运行,收入不受影响
工厂与批发商的故事
如果没有中间件(微信群)生产者给消费者发消息需要逐个去发送对应的消息,有了中间件之后只需要 统一发送就行,消费者去找自己对应的消息
RabbitMQ
1. 异步任务处理
- 场景:耗时操作(如发送邮件、生成报表、图片处理)不适合阻塞主流程。
- 实现:将任务放入消息队列,由消费者异步处理。
- 优势:
- 主程序快速响应(如用户注册后立即返回,邮件发送由队列处理)。
- 避免因任务失败导致主流程崩溃。
2. 系统提速
- 场景:高延迟操作(如数据库写入、第三方API调用)拖慢整体性能。
- 实现:主程序发布消息后立即返回,消费者逐步处理。
- 示例:
- 电商下单后,库存扣减和日志记录通过队列异步执行。
- 吞吐量提升:队列充当缓冲区,允许系统以最大承受速度处理任务。
3. 接口解耦
- 场景:系统间直接调用导致强依赖(如支付系统与物流系统)。
- 实现:通过消息队列间接通信,生产者无需知道消费者细节。
- 优势:
- 系统可独立扩展或升级(如新增一个消费者不会影响生产者)。
- 协议灵活性:不同语言/框架的系统可通过标准协议(如AMQP)交互。
4. 流量削峰(Peak Shaving)
- 场景:突发流量(如秒杀活动)可能压垮后端服务。
- 实现:将请求放入队列,消费者按固定速率处理。
- 关键点:
- 队列缓冲超载请求,避免服务崩溃。
- 配合限流策略(如设置队列最大长度),保证系统稳定。
核心组件及其关系
1. 生产者 (Producer)
- 作用: 消息的发送方,类似于写信的人
- 职责: 创建消息并发送到交换机
- 特点: 生产者不直接将消息发送给队列,而是发送给Exchange
2. 交换机 (Exchange)
- 作用: 消息的邮局/分拣中心
- 职责: 接收生产者的消息,根据路由规则决定消息发送到哪个队列
- 类型:
- Direct: 精确匹配路由键
- Topic: 支持通配符匹配 (
*
和#
) - Fanout: 广播到所有绑定的队列
- Headers: 根据消息头属性路由
3. 队列 (Queue)
- 作用: 消息的邮箱
- 职责: 存储消息,等待消费者来获取
- 特点: 先进先出(FIFO)的数据结构
4. 消费者 (Consumer)
- 作用: 消息的接收方,类似于收信的人
- 职责: 从队列中获取并处理消息
工作流程
生产者 → Exchange → Queue → 消费者↓ ↓ ↓ ↓写信 → 邮局 → 邮箱 → 收信人
绑定关系 (Binding)
绑定是连接Exchange和Queue的路由规则:
// 您代码中的绑定示例
.bind(queue) // 绑定队列
.to(exchange) // 到交换机
.with("sys.msg.*") // 路由键规则
这意味着:
- 当生产者发送路由键为
sys.msg.login
的消息时 → 会路由到queue_sys_msg
- 当发送路由键为
user.info.update
的消息时 → 不会路由到此队列
实际应用场景举例
场景: 系统消息通知
-
生产者: 用户服务
// 发送登录消息 rabbitTemplate.convertAndSend("exchange_msg", "sys.msg.login", "用户张三登录");
-
Exchange:
exchange_msg
(Topic类型)- 接收到路由键
sys.msg.login
的消息
- 接收到路由键
-
路由判断:
sys.msg.login
匹配sys.msg.*
规则 ✅- 消息被路由到
queue_sys_msg
-
队列:
queue_sys_msg
- 存储消息等待处理
-
消费者: 系统通知服务
@RabbitListener(queues = "queue_sys_msg") public void handleSysMessage(String message) {// 处理系统消息logger.info("收到系统消息: " + message); }
关键理解点
- 解耦性: 生产者不需要知道具体哪个消费者会处理消息
- 灵活性: 通过不同的Exchange类型和绑定规则,可以实现各种消息路由策略
- 可靠性: 消息持久化、队列持久化确保消息不会丢失
- 扩展性: 可以轻松添加新的队列和消费者
集成Rabbitmq - 引入配置和依赖
<!-- 引入 RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: imooc-red-book
集成Rabbitmq - 创建交换机和队列
我们来看完整代码
package com.imooc;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** 根据模型编写代码:* 1. 定义交换机* 2. 定义队列* 3. 创建交换机* 4. 创建队列* 5. 队列和交换机的绑定*/public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder // 构建交换机.topicExchange(EXCHANGE_MSG) // 使用topic类型,参考:https://www.rabbitmq.com/getstarted.html.durable(true) // 设置持久化,重启mq后依然存在.build();}@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*") // 定义路由规则(requestMapping).noargs();// FIXME: * 和 # 分别代表什么意思?}}
Spring Boot自动创建机制
当Spring容器启动时,会自动扫描所有标注了@Bean
的方法,并将返回值注册到Spring容器中。对于RabbitMQ组件,Spring AMQP会自动检测这些Bean并在RabbitMQ服务器上创建对应的实体。
代码执行流程
1. 创建交换机
@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG) // 创建名为"exchange_msg"的Topic交换机.durable(true) // 持久化,服务器重启后不会丢失.build();
}
2. 创建队列
@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG); // 创建名为"queue_sys_msg"的队列
}
3. 建立绑定关系
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue) // 绑定队列.to(exchange) // 到交换机.with("sys.msg.*") // 使用路由键模式.noargs();
}
Topic交换机的路由规则
关于您代码中的FIXME注释,Topic类型的通配符含义:
-
*
(星号): 匹配一个单词sys.msg.*
可以匹配:sys.msg.login
、sys.msg.logout
、sys.msg.error
- 但不能匹配:
sys.msg.user.login
(因为有两个单词)
-
#
(井号): 匹配零个或多个单词sys.msg.#
可以匹配:sys.msg
、sys.msg.login
、sys.msg.user.login.success
集成Rabbitmq - 创建生产者,配置路由规则
1. 添加依赖 (pom.xml)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置文件 (application.yml)
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /
3. RabbitMQ配置类 (已有)
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";// 创建Topic交换机@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG).durable(true).build();}// 创建队列@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}// 绑定关系:队列绑定到交换机,使用路由规则@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*") // 关键:路由键规则.noargs();}
}
1. 核心常量定义
public static final String EXCHANGE_MSG = "exchange_msg";
public static final String QUEUE_SYS_MSG = "queue_sys_msg";
- 作用: 定义交换机和队列的名称常量
- 好处: 避免硬编码,便于维护和引用
2. Topic交换机配置
@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG) // 创建Topic类型交换机.durable(true) // 持久化配置.build();
}
Topic交换机特点:
- 类型: Topic Exchange(主题交换机)
- 路由规则: 支持通配符匹配
*
:匹配一个单词#
:匹配零个或多个单词
- 持久化:
durable(true)
确保服务器重启后交换机不丢失
3. 队列配置
@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG);
}
- 队列名:
queue_sys_msg
- 用途: 存储系统消息
- 默认配置: 非持久化队列(可以改为持久化)
4. 绑定关系配置
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue) // 绑定队列.to(exchange) // 到交换机.with("sys.msg.*") // 使用路由规则.noargs();
}
4. 生产者控制器
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("produce")public Object produce() throws Exception {// 发送消息到指定交换机,使用特定路由键rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG, // 交换机名称"sys.msg.send", // 路由键"我发了一个消息\~\~" // 消息内容);return GraceJSONResult.ok();}
}
集成Rabbitmq - 消费者接受消息处理业务
package com.imooc;import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);}}
1. 消费者组件定义
@Slf4j
@Component
public class RabbitMQConsumer {
@Component
: 将此类注册为Spring Bean@Slf4j
: 自动生成日志对象,用于记录日志
2. 队列监听
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
@RabbitListener
: 监听指定队列的注解queues = {RabbitMQConfig.QUEUE_SYS_MSG}
: 监听名为queue_sys_msg
的队列String payload
: 接收消息的具体内容Message message
: 完整的消息对象,包含元数据
3. 消息处理逻辑
log.info(payload); // 打印消息内容
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey); // 打印路由键
完整工作机制
监听机制
- 当应用启动时,Spring会自动扫描带有
@RabbitListener
的方法 - 为该方法创建一个消息监听容器
- 容器会持续监听
queue_sys_msg
队列
消息处理流程
- 接收消息: 当队列中有新消息时,自动触发
watchQueue
方法 - 解析内容: 获取消息的文本内容 (
payload
) - 提取路由键: 从消息属性中获取路由键信息
- 记录日志: 将消息内容和路由键打印到控制台
rabbitTemplate.convertAndSend
方法详解
这行代码是RabbitMQ异步消息发送的核心部分,让我逐个参数详细解析:
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue,JsonUtils.objectToJson(messageMO)
);
参数解析
参数1: 交换机名称
RabbitMQConfig.EXCHANGE_MSG
- 作用: 指定消息要发送到哪个交换机
- 实际值: 通常是
"exchange_msg"
(常量值) - 工作方式: 交换机接收所有消息,并根据路由规则分发
参数2: 路由键
"sys.msg." + MessageEnum.FOLLOW_YOU.enValue
- 拼接结果:
"sys.msg.follow"
- 作用: 决定消息如何被路由到目标队列
- 匹配规则: 与交换机绑定时定义的模式进行匹配
- 如:
sys.msg.*
会匹配此路由键
- 如:
参数3: 消息内容
JsonUtils.objectToJson(messageMO)
- 输入:
messageMO
(消息对象) - 转换过程: 对象 → JSON字符串
- 输出示例:
{"fromUserId": "用户123","toUserId": "博主456","msgContent": null
}
- 传输形式: 字节数组形式在网络上传输
异步解耦 - 系统消息入库保存
阶段一:关注操作(生产者)
@Transactional
@Override
public void doFollow(String myId, String vlogerId) {// 1. 核心业务逻辑(同步)String fid = sid.nextShort();Fans fans = new Fans();// ... 设置粉丝关系fansMapper.insert(fans); // 插入粉丝关系到数据库// 2. 构建消息对象MessageMO messageMO = new MessageMO();messageMO.setFromUserId(myId); // 关注者messageMO.setToUserId(vlogerId); // 被关注者// 3. 异步发送消息(关键点)rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue, // 路由键:sys.msg.followJsonUtils.objectToJson(messageMO));// 事务提交,关注操作立即完成!
}
阶段二:消息消费(消费者)
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 1. 解析JSON消息MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();// 2. 根据路由键判断消息类型并处理if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 异步执行系统消息入库msgService.createMsg(messageMO.getFromUserId(), // 关注者IDmessageMO.getToUserId(), // 被关注者ID MessageEnum.FOLLOW_YOU.type, // 消息类型:关注(1)null // 无额外内容);}// ... 处理其他消息类型
}
异步解耦的核心优势
1. 事务分离
// 主事务:关注业务
@Transactional
public void doFollow() {fansMapper.insert(fans); // 核心业务rabbitTemplate.convertAndSend(); // 发送MQ(非阻塞)
} // 事务立即提交// 独立处理:消息入库
@RabbitListener
public void watchQueue() {msgService.createMsg(); // 在独立的事务中处理
}
2. 时序对比
传统同步方式:
用户点击关注↓
[开始事务]├── 插入粉丝关系 (50ms)├── 更新互关状态 (30ms) └── 创建系统消息 (100ms) ← 可能慢
[提交事务] (180ms总耗时)↓
返回成功给用户
MQ异步方式:
用户点击关注↓
[开始事务]├── 插入粉丝关系 (50ms)├── 更新互关状态 (30ms)└── 发送MQ消息 (5ms) ← 超快
[提交事务] (85ms总耗时)↓
返回成功给用户 ← 用户立即看到结果[后台异步]└── 创建系统消息 (100ms) ← 后台处理
3. 系统消息入库的异步处理
消息流转过程:
// 发送的JSON消息
{"fromUserId": "user123","toUserId": "vlogger456"
}// 路由键
"sys.msg.follow"// 最终入库的系统消息
INSERT INTO sys_msg (from_user_id = 'user123',to_user_id = 'vlogger456', msg_type = 1, -- FOLLOW_YOU类型msg_content = null,create_time = now()
);
容错和可靠性保障
1. 消息持久化
// RabbitMQ配置
@Bean
public Queue queue() {return QueueBuilder.durable("queue_sys_msg") // 持久化队列.build();
}
2. 失败重试机制
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {try {// 处理消息msgService.createMsg(...);} catch (Exception e) {log.error("处理消息失败: {}", e.getMessage());// 消息会自动重新入队重试throw e; // 抛出异常触发重试}
}
其他相关的操作也同样进行异步解耦即可,我们已经在消费者模型中做了if判断处理
package com.imooc;import com.imooc.base.RabbitMQConfig;
import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,null);} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.COMMENT_VLOG.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.REPLY_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.LIKE_COMMENT.type,messageMO.getMsgContent());} else {GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR);}}}
消息流转的完整过程
1. 发送端(生产者)
// 关注操作中发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG, // 交换机:exchange_msg"sys.msg." + MessageEnum.FOLLOW_YOU.enValue, // 路由键:sys.msg.followJsonUtils.objectToJson(messageMO) // JSON消息
);
2. RabbitMQ路由过程
消息发送到交换机 "exchange_msg"↓
交换机根据路由键 "sys.msg.follow" 进行路由判断↓
匹配绑定规则:queue_sys_msg 绑定了 "sys.msg.*"↓
"sys.msg.follow" 匹配 "sys.msg.*" ✅↓
消息被路由到队列 "queue_sys_msg"↓
消息在队列中等待被消费
3. 消费端(监听器)
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 自动监听队列,有消息就触发此方法
}
监听机制的工作原理
@RabbitListener 注解的作用
@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
Spring Boot启动时:
- 扫描注解: Spring扫描到
@RabbitListener
注解 - 创建监听容器: 为该方法创建一个
MessageListenerContainer
- 建立连接: 连接到RabbitMQ服务器
- 监听队列: 持续监听
queue_sys_msg
队列 - 等待消息: 进入阻塞状态,等待队列中有新消息
消息到达时:
[队列中有新消息]↓
[监听容器检测到消息]↓
[自动调用 watchQueue 方法]↓
[传入消息内容和元数据]
消息处理的具体流程
参数接收
public void watchQueue(String payload, Message message) {// payload: JSON字符串内容// message: 完整的AMQP消息对象(包含属性、路由键等)
}
消息解析
// 1. 打印消息内容
log.info(payload); // 输出:{"fromUserId":"user123","toUserId":"vlogger456"}// 2. 解析JSON为对象
MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);// 3. 获取路由键
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey); // 输出:sys.msg.follow
业务逻辑分发
// 根据路由键判断消息类型
if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 处理关注消息:sys.msg.followmsgService.createMsg(messageMO.getFromUserId(), // 关注者messageMO.getToUserId(), // 被关注者MessageEnum.FOLLOW_YOU.type, // 消息类型:1null // 无额外内容);
}