返利app的消息队列架构:基于RabbitMQ的异步通信与解耦实践

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

在返利app的业务流程中,用户下单、返利计算、佣金到账、消息通知等环节存在强依赖关系——传统同步调用模式下,若“返利计算服务”响应延迟,会导致整个下单流程卡顿,甚至引发连锁故障。为解决这一问题,我们引入RabbitMQ消息队列,基于“生产者-交换机-队列-消费者”架构,实现服务间异步通信与业务解耦,将下单流程响应时间从500ms缩短至150ms,系统峰值吞吐量提升2倍。以下从架构设计、核心组件实现、业务场景落地三方面展开,附完整代码示例。
返利app

一、返利app RabbitMQ架构设计

1.1 架构分层与组件职责

针对返利app的业务特性,设计三层消息通信架构,各组件职责如下:

  • 生产者层:各微服务(订单服务、用户服务、返利服务)作为生产者,将业务事件(如“订单创建”“返利生成”)封装为消息发送至RabbitMQ;
  • 中间件层:RabbitMQ通过交换机(Exchange)与队列(Queue)的绑定关系,实现消息路由——采用Topic交换机支持按规则匹配路由,Fanout交换机实现广播通知;
  • 消费者层:下游服务(如通知服务、统计服务)作为消费者,监听指定队列,异步处理消息,避免与上游服务强耦合。

1.2 核心业务消息流转路径

以“用户下单”场景为例,消息流转路径为:

  1. 订单服务(生产者)创建“订单创建”消息,发送至order-exchange交换机;
  2. 交换机按路由键order.created,将消息路由至order-confirm-queue(商家确认队列)与rebate-calculate-queue(返利计算队列);
  3. 商家服务监听order-confirm-queue,异步处理订单确认;返利服务监听rebate-calculate-queue,异步计算返利金额;
  4. 返利服务计算完成后,作为生产者发送“返利生成”消息至rebate-exchange,由通知服务监听队列并发送用户到账通知。

二、RabbitMQ核心组件代码实现

2.1 消息生产者封装(通用发送组件)

基于Spring AMQP封装通用消息发送组件,支持指定交换机、路由键与消息属性,代码如下:

package cn.juwatech.rebate.mq.producer;import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;/*** RabbitMQ通用消息生产者*/
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;/*** 发送消息(带确认机制,确保消息可靠投递)* @param exchange 交换机名称* @param routingKey 路由键* @param message 消息体*/public void sendMessage(String exchange, String routingKey, Object message) {// 1. 生成消息唯一ID(用于消息确认与追踪)String messageId = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(messageId);// 2. 设置消息确认回调(确保消息到达交换机)rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {if (ack) {// 消息成功到达交换机System.out.printf("消息[%s]已到达交换机,exchange:%s%n", messageId, exchange);} else {// 消息投递失败,可记录日志并触发重试System.err.printf("消息[%s]投递交换机失败,原因:%s%n", messageId, cause);}});// 3. 设置消息返回回调(交换机无法路由时触发)rabbitTemplate.setReturnsCallback(returned -> {System.err.printf("消息[%s]路由失败,routingKey:%s,原因:%s%n",messageId, returned.getRoutingKey(), returned.getReplyText());// 路由失败处理:如发送至死信队列handleReturnedMessage(returned, message);});// 4. 发送消息rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);}/*** 处理路由失败的消息(发送至死信队列)*/private void handleReturnedMessage(org.springframework.amqp.core.ReturnedMessage returned, Object message) {String deadExchange = RabbitMqConfig.DEAD_LETTER_EXCHANGE;String deadRoutingKey = RabbitMqConfig.DEAD_LETTER_ROUTING_KEY;rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, new CorrelationData(UUID.randomUUID().toString()));}
}

2.2 RabbitMQ配置类(交换机、队列、绑定关系)

通过配置类定义业务所需的交换机、队列及绑定规则,包含死信队列配置以处理失败消息,代码如下:

package cn.juwatech.rebate.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ交换机、队列、绑定关系配置*/
@Configuration
public class RabbitMqConfig {// 1. 订单相关配置public static final String ORDER_EXCHANGE = "order-exchange";public static final String ORDER_CONFIRM_QUEUE = "order-confirm-queue";public static final String REBATE_CALCULATE_QUEUE = "rebate-calculate-queue";public static final String ROUTING_KEY_ORDER_CREATED = "order.created";// 2. 返利相关配置public static final String REBATE_EXCHANGE = "rebate-exchange";public static final String REBATE_NOTIFY_QUEUE = "rebate-notify-queue";public static final String ROUTING_KEY_REBATE_GENERATED = "rebate.generated";// 3. 死信队列配置public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter";/*** 1. 声明死信交换机与死信队列(处理失败消息)*/@Beanpublic DirectExchange deadLetterExchange() {// Direct交换机:精确匹配路由键return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();}@Beanpublic Queue deadLetterQueue() {// 持久化队列,避免消息丢失return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}@Beanpublic Binding deadLetterBinding() {// 绑定死信交换机与队列return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}/*** 2. 声明订单交换机(Topic类型,支持模糊匹配路由键)*/@Beanpublic TopicExchange orderExchange() {return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}/*** 声明订单确认队列(绑定死信交换机,消息消费失败时转发)*/@Beanpublic Queue orderConfirmQueue() {return QueueBuilder.durable(ORDER_CONFIRM_QUEUE)// 配置死信交换机.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)// 配置死信路由键.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)// 配置消息过期时间(30分钟).withArgument("x-message-ttl", 1800000).build();}/*** 声明返利计算队列*/@Beanpublic Queue rebateCalculateQueue() {return QueueBuilder.durable(REBATE_CALCULATE_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).withArgument("x-message-ttl", 1800000).build();}/*** 绑定订单交换机与订单确认队列*/@Beanpublic Binding orderConfirmBinding() {return BindingBuilder.bind(orderConfirmQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 绑定订单交换机与返利计算队列*/@Beanpublic Binding rebateCalculateBinding() {return BindingBuilder.bind(rebateCalculateQueue()).to(orderExchange()).with(ROUTING_KEY_ORDER_CREATED);}/*** 3. 声明返利交换机与通知队列*/@Beanpublic TopicExchange rebateExchange() {return ExchangeBuilder.topicExchange(REBATE_EXCHANGE).durable(true).build();}@Beanpublic Queue rebateNotifyQueue() {return QueueBuilder.durable(REBATE_NOTIFY_QUEUE).withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY).build();}@Beanpublic Binding rebateNotifyBinding() {return BindingBuilder.bind(rebateNotifyQueue()).to(rebateExchange()).with(ROUTING_KEY_REBATE_GENERATED);}
}

2.3 消息消费者实现(业务处理)

以“返利计算消费者”为例,监听rebate-calculate-queue队列,异步处理订单返利计算,代码如下:

package cn.juwatech.rebate.mq.consumer;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.service.RebateCalculateService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 返利计算消息消费者*/
@Component
@RequiredArgsConstructor
public class RebateCalculateConsumer {private final RebateCalculateService rebateCalculateService;/*** 监听返利计算队列,处理订单返利* @param orderDTO 订单数据(消息体)*/@RabbitListener(queues = RabbitMqConfig.REBATE_CALCULATE_QUEUE)public void handleRebateCalculate(OrderDTO orderDTO) {try {System.out.printf("开始处理订单返利,订单ID:%s,用户ID:%s%n", orderDTO.getOrderId(), orderDTO.getUserId());// 调用返利计算服务(核心业务逻辑)rebateCalculateService.calculateRebate(orderDTO);// 手动确认消息(默认AUTO模式,此处显式确认确保业务处理完成)// 注:若使用AUTO模式,方法无异常则自动确认,抛出异常则拒绝并重回队列} catch (Exception e) {System.err.printf("订单返利处理失败,订单ID:%s,原因:%s%n", orderDTO.getOrderId(), e.getMessage());// 消费失败处理:可记录日志,触发告警,避免消息重复重试throw new RuntimeException("返利计算失败,消息将转发至死信队列", e);}}
}

2.4 业务层消息发送示例(订单服务)

订单服务创建订单后,通过生产者发送“订单创建”消息,触发后续异步流程,代码如下:

package cn.juwatech.rebate.service.impl;import cn.juwatech.rebate.dto.OrderDTO;
import cn.juwatech.rebate.mq.config.RabbitMqConfig;
import cn.juwatech.rebate.mq.producer.RabbitMqProducer;
import cn.juwatech.rebate.service.OrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** 订单服务实现类*/
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {private final RabbitMqProducer rabbitMqProducer;// 省略订单DAO层依赖...@Override@Transactional(rollbackFor = Exception.class)public void createOrder(OrderDTO orderDTO) {// 1. 保存订单数据(本地事务)saveOrder(orderDTO);// 2. 发送订单创建消息(异步触发商家确认与返利计算)rabbitMqProducer.sendMessage(RabbitMqConfig.ORDER_EXCHANGE,RabbitMqConfig.ROUTING_KEY_ORDER_CREATED,orderDTO);System.out.printf("订单创建成功,订单ID:%s,消息已发送%n", orderDTO.getOrderId());}// 省略订单保存逻辑...
}

三、消息可靠性保障与性能优化

3.1 可靠性保障措施

  1. 消息持久化:交换机、队列均配置为durable=true,消息发送时设置deliveryMode=2(持久化),避免RabbitMQ重启丢失消息;
  2. 生产者确认:通过ConfirmCallback确保消息到达交换机,ReturnsCallback处理路由失败消息,转发至死信队列;
  3. 消费者确认:采用手动确认模式(或AUTO模式结合异常处理),确保业务逻辑执行完成后再确认消息,避免消息丢失;
  4. 死信队列:配置消息过期时间(TTL)与死信路由,消费失败的消息最终进入死信队列,避免无限重试导致系统资源浪费。

3.2 性能优化策略

  1. 消息批量发送:对高频低时延要求的场景(如用户行为日志),采用rabbitTemplate.convertAndSend批量发送,减少网络请求次数;
  2. 消费者线程池配置:通过spring.rabbitmq.listener.simple.concurrencymax-concurrency设置消费者线程池大小,默认1-10,根据业务调整为5-20;
  3. 队列分片:对高流量队列(如rebate-calculate-queue),按用户ID哈希拆分多个队列(如rebate-calculate-queue-1-4),分散消费压力;
  4. 消息压缩:对大体积消息(如包含商品详情的订单数据),发送前通过Gzip压缩,接收后解压,减少网络传输与存储开销。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/96717.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/96717.shtml
英文地址,请注明出处:http://en.pswp.cn/web/96717.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3 响应式失效 debug:Proxy 陷阱导致数据更新异常的深度排查

人们眼中的天才之所以卓越非凡,并非天资超人一等而是付出了持续不断的努力。1万小时的锤炼是任何人从平凡变成超凡的必要条件。———— 马尔科姆格拉德威尔 🌟 Hello,我是Xxtaoaooo! 🌈 “代码是逻辑的诗篇&#xff0…

【贪心算法】day10

📝前言说明: 本专栏主要记录本人的贪心算法学习以及LeetCode刷题记录,按专题划分每题主要记录:(1)本人解法 本人屎山代码;(2)优质解法 优质代码;&#xff…

LeetCode算法日记 - Day 42: 岛屿数量、岛屿的最大面积

目录 1. 岛屿数量 1.1 题目解析 1.2 解法 1.3 代码实现 2. 岛屿的最大面积 2.1 题目解析 2.2 解法 2.3 代码实现 1. 岛屿数量 https://leetcode.cn/problems/number-of-islands/ 给你一个由 1(陆地)和 0(水)组成的的二维…

短波红外相机在机器视觉检测方向的应用

短波红外相机在机器视觉检测方向的应用短波红外相机:机器视觉的“低成本突破者”一、打破成本困局:短波红外的“平民化”革新二、核心技术:有机材料的“硬核创新”1. 材料革命:有机感光层的优势2. 工艺兼容:嫁接成熟CM…

【数据结构与算法】图 Floyd算法

相关题目: 1334. 阈值距离内邻居最少的城市 - 力扣(LeetCode) 资料 : Floyd算法原理及公式推导 - 知乎 Floyd 算法是一种经典的动态规划算法,用与求解图中所有顶点之间的最短短路路径。它由Robert Floyd 于1962…

卫星通信天线的指向精度,含义、测量和计算

卫星通信天线的指向精度,含义、测量和计算我们在卫星通信天线的技术规格书中,都会看到天线指向精度这个指标。一般来说,技术规格书上的天线指向精度的参数是这么写的:“天线指向精度≤1/10半功率波束带宽”今天这个文章&#xff0…

基于LSTM与3秒级Tick数据的金融时间序列预测实现

数据加载模块解析 def load_data(filepath):df pd.read_csv(filepath)return df该函数承担基础数据采集职责,通过Pandas库读取CSV格式的高频交易数据(典型如股票分笔成交明细)。输入参数为文件路径字符串,输出结构化DataFrame对象…

C# --- Field and Property

C# --- Field and Property字段 (Field) vs. 属性 (Property)Property的声明初始化方法单例类property错误初始化导致线程泄漏字段 (Field) vs. 属性 (Property) 字段 (Field) - 数据的存储容器 字段是直接在类或结构中声明的变量。它是存储数据的地方,是对象状态的…

【Python】实现一个文件夹快照与比较工具

1. 工具简介 在日常开发、项目管理或备份场景中,我们经常需要知道某个文件夹中的文件是否发生变化,例如: 项目源码是否新增或修改文件?数据集是否被不小心删除或篡改?备份文件夹是否和上次一致? 本教程将教…

LINUX913 shell:set ip [lindex $argv 0],\r,send_user,spawn ssh root@ip “cat “

问题 获取公钥 [codesamba ~]$ cat pub.sh #!/bin/usr/expect set ip "$1" set password 123456 set timeout 20 spawn ssh root192.168.235.100:cat ~/.ssh/id_rsa.pub expect { "yes/no" {send "yes/r";exp_continue} "password:" {…

Acwing算法基础课--链表

一、单链表 AcWing 826. 单链表 代码 N 100010 idx 0 e [0] * N ne [0] * N head -1def init():global idx,headidx 0head -1def add_head(x):global idx,heade[idx] xne[idx] headhead idxidx 1def delete(k):ne[k] ne[ne[k]]def add_k(k,x):global idxe[idx] …

AI表征了西方的有界,AI+体现了东方的无界

AI表征了西方的有界,AI体现了东方的无界,试图通过文化差异的视角来对比传统AI(AI)与增强型或融合型AI(AI)的特征。一、“AI表征了西方的有界”西方的“有界”可以理解为:1、逻辑清晰、结构严谨&…

LabVIEW泵轮检测

​在现代制造业蓬勃发展的浪潮下,汽车行业也迎来了高速发展期。液力变矩器作为实现车辆自动变速的关键零件产品,在汽车动力系统中扮演着不可或缺的角色。泵轮作为液力变矩器的核心组成部分,其生产质量直接影响着液力变矩器的性能。因此&#…

RT-DETRv2 中的坐标回归机制深度解析:为什么用 `sigmoid(inv_sigmoid(ref) + delta)` 而不是除以图像尺寸?

引言:一个看似简单的公式,背后藏着工业级设计智慧 在阅读 RT-DETRv2(Real-Time DETR v2)源码时,我曾被一行代码深深震撼: inter_ref_bbox F.sigmoid(bbox_head[i](output) inverse_sigmoid(ref_points_de…

简单了解一下GraphRAG

传统RAG的缺点 当我们将一段文本信息以句子分割后,存入到向量数据库中。用户提问“老王喜欢吃什么”,这个问题会与向量数据库中的许多句子关联性比较强,能返回准确且具体的信息。 但是,若是问题换成“出现了几次西瓜”&#xff0c…

HTTP 状态码背后的逻辑:从请求到响应的完整流程解析(含完整流程图)

在日常的 Web 开发与 API 调试中,我们经常会遇到各种 HTTP 状态码 ——404 Not Found、401 Unauthorized、500 Internal Server Error... 这些数字背后并非随机出现,而是服务器处理请求过程中不同阶段的 "反馈信号"。理解这些状态码的触发逻辑…

Vue:下拉框多选影响行高

目录 一、 出现场景二、 解决方案 一、 出现场景 在使用el-select增加multiple属性进行多选时&#xff0c;会出现高度塌陷的情况 二、 解决方案 首先需要在el-select中增加collapse-tags属性&#xff0c;并在style中增加如下样式 方案一 <style scoped> ::v-deep .e…

如何在高通跃龙QCS6490 Arm架构上使用Windows 11 IoT企业版?

1.简介研华已将高通跃龙QCS6490 技术应用于嵌入式模块、单板电脑和AI摄像头等各种规格的嵌入式硬件中。QCS6490平台支持全面的操作系统生态系统&#xff0c;包括Windows、Ubuntu、Yocto和 Android。Windows 11 IoT企业版是微软新一代的物联网操作系统&#xff0c;具有更强的安全…

阿里云国际代理:如何利用RDS构建高可用、可扩展的数据库架构

讲下云数据库RDS案例解析&#xff0c;若在上云或用云过程中有不懂的&#xff0c;可寻云枢国际yunshuguoji助力免卡上云用云。1、RDS MySQL数据库代理支持读写分离、连接保持、就近访问、事务拆分、连接池、SSL加密等功能&#xff0c;能够降低主实例负载&#xff0c;提高实例可用…

C++之特殊类设计

文章目录前言一、 设计一个不能被拷贝的类1. C98 实现方式2. C11 实现方式二、设计一个只能在堆上创建对象的类1. 方法一&#xff1a;析构函数私有&#xff0c;提供destory接口释放资源2. 方法二&#xff1a;构造函数私有三、 设计一个只能在栈上创建对象的类1. 实现方式四、设…