一、RabbitMQ 延时队列实现方式

  1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)
    这是最常用的实现方式,核心思路是:
    (1)消息设置过期时间(TTL)
    (2)消息过期后进入绑定的死信队列
    (3)消费者监听死信队列,实现延时消费
// 1. 配置交换机和队列
@Configuration
public class DelayQueueConfig {// 普通交换机(用于接收原始消息)public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列(消息过期后会进入死信队列)public static final String NORMAL_QUEUE = "normal_queue";// 死信队列(实际消费的队列)public static final String DEAD_QUEUE = "dead_queue";// 声明普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}// 声明死信交换机@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}// 声明普通队列(设置死信相关参数)@Beanpublic Queue normalQueue() {Map<String, Object> args = new HashMap<>();// 设置死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信路由键args.put("x-dead-letter-routing-key", "dead_routing_key");// 队列消息统一过期时间(可选,也可在发送消息时单独设置)// args.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();}// 声明死信队列@Beanpublic Queue deadQueue() {return QueueBuilder.durable(DEAD_QUEUE).build();}// 绑定普通队列和普通交换机@Beanpublic Binding normalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_key");}// 绑定死信队列和死信交换机@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_routing_key");}
}// 2. 发送延时消息(设置消息级别的TTL)
@Service
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayMillis) {// 设置消息过期时间rabbitTemplate.convertAndSend(DelayQueueConfig.NORMAL_EXCHANGE,"normal_routing_key",message,correlationData -> {correlationData.getMessageProperties().setExpiration(String.valueOf(delayMillis));return correlationData;});}
}// 3. 消费死信队列消息(延时后的消息)
@Service
public class DelayMessageReceiver {@RabbitListener(queues = DelayQueueConfig.DEAD_QUEUE)public void receiveDelayMessage(String message) {System.out.println("收到延时消息:" + message + ",时间:" + new Date());}
}
  1. 基于 RabbitMQ 插件(rabbitmq_delayed_message_exchange)
    更推荐的方式,需先安装插件:
    (1)下载对应版本的 rabbitmq_delayed_message_exchange 插件
    (2)启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
@Configuration
public class DelayedExchangeConfig {// 延时交换机(类型必须是 x-delayed-message)@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定转发类型return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed_queue").build();}@Beanpublic Binding delayedBinding() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();}
}// 发送延时消息
@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayedMessage(String message, long delayMillis) {rabbitTemplate.convertAndSend("delayed_exchange","delayed_routing_key",message,correlationData -> {// 设置延时时间(毫秒)correlationData.getMessageProperties().setHeader("x-delay", delayMillis);return correlationData;});}
}

二、保证消息幂等性的方案
消息幂等性指:同一条消息被多次消费时,结果是一致的,不会重复处理。常见实现方式:

  1. 基于唯一 ID + Redis / 数据库去重
    (1)发送消息时生成唯一 ID(如 UUID)
    (2)消费前检查该 ID 是否已处理
    (3)处理完成后标记该 ID 为已处理
@Service
public class IdempotentMessageReceiver {@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "delayed_queue")public void receiveMessage(Message message) {// 1. 获取消息唯一ID(假设放在消息头)String messageId = message.getMessageProperties().getMessageId();if (StringUtils.isEmpty(messageId)) {// 非法消息,直接拒绝throw new AmqpRejectAndDontRequeueException("消息ID为空");}// 2. 检查是否已处理(Redis分布式锁保证原子性)String key = "message:processed:" + messageId;Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);if (Boolean.FALSE.equals(isFirst)) {// 已处理过,直接返回System.out.println("消息已处理,ID:" + messageId);return;}// 3. 处理消息业务逻辑String content = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("处理消息:" + content);}
}
  1. 基于业务唯一标识去重
    如果消息没有全局 ID,可使用业务字段组合作为唯一标识(如订单号):
// 例如处理订单支付消息,用订单号作为唯一标识
String orderNo = extractOrderNo(content); // 从消息中提取订单号
String key = "order:processed:" + orderNo;
// 后续逻辑同上(检查Redis -> 处理业务)
  1. 数据库唯一约束
    通过数据库唯一索引实现幂等:
@Transactional
public void processOrder(String orderNo) {// 插入记录前检查,或直接插入(利用唯一索引报错)try {orderMapper.insert(new Order(orderNo)); // 假设orderNo有唯一索引// 处理订单逻辑} catch (DuplicateKeyException e) {// 已处理过,忽略log.info("订单已处理:{}", orderNo);}
}

总结
1.延时队列实现:
(1)简单场景用 TTL + 死信队列
(2)生产环境推荐用 rabbitmq_delayed_message_exchange 插件(更可靠)
2.幂等性保证核心:
(1)为消息生成唯一标识(全局 ID 或业务唯一键)
(2)消费前检查标识是否已处理(Redis / 数据库)
(3)确保检查和标记操作的原子性(分布式锁 / 事务)
这两种机制结合,可实现可靠的延时任务处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95041.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95041.shtml
英文地址,请注明出处:http://en.pswp.cn/web/95041.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前沿技术观察:从AI 时代到量子计算的下一站

前沿技术观察&#xff1a;从AI 时代到量子计算的下一站&#x1f680; 技术的浪潮一波接一波&#xff0c;从 人工智能 到 区块链&#xff0c;再到 边缘计算、元宇宙、量子计算&#xff0c;这些前沿技术正在深刻影响我们的生活与产业格局。 对于开发者和技术爱好者来说&#xff0…

通过Kubernetes安装mysql5服务

以下是清晰、结构化的操作流程优化说明&#xff0c;按步骤梳理从部署到配置持久化、暴露服务的完整过程&#xff1a;一、基础部署&#xff1a;快速验证 MySQL 可用性创建有状态工作负载进入 KubeSphere 项目 → 工作负载 → 有状态副本集 → 创建&#xff0c;选择 通过镜像创建…

【mysql】SQL 中 IS 与 = 的区别:一个 NULL 值引发的思考

SQL 中 IS 与 的区别&#xff1a;一个 NULL 值引发的思考为什么查询结果总是少一条数据&#xff1f;可能是 NULL 在捣鬼在 SQL 查询中&#xff0c;很多开发者都曾遇到过这样的困惑&#xff1a;明明看起来正确的查询语句&#xff0c;返回的结果却总是与预期不符。这往往是因为没…

openGauss笔记

1、安装 直接用docker安装 2、国产化 符合国产化要求 3、客户端 3.1 dbeaver 社区版本&#xff08;25.1.4&#xff09;即可&#xff0c;驱动建议用离线版本&#xff0c;在官网下载最新的&#xff0c;然后在驱动管理里面进行添加本地的jar 3.1.1 驱动配置3.1.2 依赖 需要java版本…

SQL语言增删改查之C与R

本节通关要求1、掌握 SQL 语句对数据库进行的创建 Create 和读取 Retireve 操作的指令&#xff1b;2、多练习&#x1f3ae;说明&#xff1a;操作对象是数据表中的数据行&#xff0c;也就是表中的记录。请明确操作对象&#xff0c;不要误伤友军。背景&#xff1a;create table i…

栈溢出问题

brpc 的 bthread 默认协程栈大小是 128KB&#xff08;非 pthread 模式&#xff09;。如果在一个bthread中&#xff0c;它执行的函数内定义了一个局部变量map&#xff0c;有很多个元素&#xff0c;map的大小超过了128KB&#xff0c;协程会自动申请新的栈空间吗&#xff1f;这里要…

Android之穿山甲广告接入

文章目录前言一、效果图二、实现步骤1.引入库2.build.gradle依赖3.Application初始化3.开屏广告4.插屏广告5.懒人做法总结前言 项目接入广告已经是常见的现象了&#xff0c;但是还有很多朋友或者初学者没有接触过&#xff0c;或者没有接触过穿山甲&#xff0c;今天就来看一下&…

Web开发工具一套式部署Maven/Nvm/Mysql/Redis

前言&#xff1a; 对于一个纯小白且电脑没有任何环境的计算机学生&#xff0c;如何快速跑通Java前后端项目呢&#xff1f; 先附上百度网盘 地址&#xff1a; Web开发工具 。 以下链接来自不同作者&#xff0c;如有侵犯&#xff0c;请联系我删除。 1.Jdk 部署地址&#xff1a…

Deepseek法务提示指令收集

参考网络资料&#xff0c;收集一些法务提示指令&#xff0c;可用于Agent LLM、以及LLM法律相关开发。 https://zhuanlan.zhihu.com/p/22588251815 1 基础指令 1) 身份认证模块 【身份与版本声明】 您是由DeepSeek研发的法律智能辅助系统V4.2版&#xff0c;内核经司法部《生成…

Tiptrans转运 | 免费5国转运地址

Tiptrans 是一家总部位于捷克的国际包裹转运与虚拟地址服务平台&#xff0c;主要提供全球虚拟收货地址&#xff08;英国、德国、香港、美国等&#xff09;&#xff0c;让用户在当地网店购物&#xff0c;再由 Tiptrans 转运到海外。除了物流服务&#xff0c;Tiptrans 也提供虚拟…

STM32手动移植FreeRTOS

&#x1f4e6; 准备工作 获取FreeRTOS源码: 访问 FreeRTOS官网 或其 GitHub仓库 下载最新版内核源码。 你也可以使用Git克隆&#xff08;注意要包含子模块&#xff09;&#xff1a;git clone https://github.com/FreeRTOS/FreeRTOS.git --recurse-submodules。 准备STM32基础…

C5仅支持20MHZ带宽,如果路由器5Gwifi处于40MHZ带宽信道时,会出现配网失败

是的&#xff0c;这会导致“怎么都连不上”。结论先说&#xff1a;如果路由器把 5 GHz 固定在 40 MHz&#xff08;或以上&#xff09;带宽&#xff0c;而你的 C5 只支持 5 GHz 的 20 MHz 带宽&#xff0c;那么 STA 连接一定会失败。固件里不可能“把 40 MHz AP 连成 20 MHz”&a…

坚鹏请教DEEPSEEK:请问中国领先的AI智能体服务商有哪些?知行学

坚鹏请教DEEPSEEK&#xff1a;请问中国领先的AI智能体服务商有哪些&#xff1f;深圳知行学教育科技公司名列榜首根据2025年8月底多家权威机构发布的榜单和报告&#xff0c;比如德本咨询&#xff08;DBC&#xff09;的“2025企业级AI Agent应用TOP50”榜单、IDC的《中国AI AGENT…

【开题答辩全过程】以 投票系统为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

C++异常处理指南:构建健壮程序的错误处理机制

在程序开发的世界里&#xff0c;“错误” 是绕不开的话题。你可能写过一个简单的计算器&#xff0c;却因为用户输入 “50” 而崩溃&#xff1b;也可能在操作数据库时&#xff0c;因为权限不足导致数据读取失败&#xff1b;甚至在申请内存时&#xff0c;因为系统资源耗尽而无法继…

comfUI背后的技术——VAE

第一次知道VAE可能还是许嵩。当然&#xff0c;这里的VAE指的是变分自编码器&#xff08;Variational Autoencoder, VAE&#xff09; Seq2Seq 在 Seq2Seq 框架提出之前&#xff0c;深度神经网络在图像分类等问题上取得了非常好的效果。在其擅长解决的问题中&#xff0c;输入和…

【序列晋升】21 Spring Cloud Gateway 云原生网关演进之路

Spring Cloud Gateway作为Spring生态系统中的核心组件&#xff0c;已成为微服务架构中的首选API网关解决方案。它基于响应式编程模型&#xff0c;提供高性能、可扩展的路由管理和跨领域功能&#xff0c;解决了传统微服务架构中的接口聚合、安全管控和流量控制等核心问题。与此同…

“HEU-AUTO”无线上网使用指南

本文针对笔记本电脑 笔者电脑型号为&#xff1a;2025联想拯救者Y9000p 5060步骤1&#xff1a;点击开始菜单&#xff0c;点击设置&#xff0c;如图步骤2&#xff1a;在Windows设置菜单中&#xff0c;点击“网络和Internet”选项&#xff0c;如下图&#xff1a;步骤3&#xff1a;…

微信小程序中蓝牙打印机中文编码处理:使用iconv-lite库

在微信小程序开发中,集成蓝牙打印机实现中文打印是常见需求,但中文文本常因编码不匹配(如UTF-8与GBK冲突)导致乱码问题。本文详细解释如何利用iconv-lite库高效处理中文编码转换,确保打印内容正确显示。文章结构清晰,逐步引导您解决问题,代码示例基于实际项目验证。 1. …

GraphRAG——v0.3.6版本使用详细教程、GraphRAG数据写入Neo4j图数据库、GraphRAG与Dify集成

GraphRAG——v0.3.6版本使用详细教程、GraphRAG数据写入Neo4j图数据库、GraphRAG与Dify集成理论部分安装知识图谱生成测试将数据导入到Neo4j图数据库可视化将GraphRAG与Dify集成理论部分 https://guoqingru.blog.csdn.net/article/details/150771388?spm1011.2415.3001.5331安…