系统设计

架构图

+----------------+       +-----------------+       +----------------+
|                |       |                 |       |                |
|  生产者        |------>| Redis ZSet      |------>| 定时任务消费者  |
|  (添加延迟任务) |       | (延迟队列存储)  |       | (扫描并处理任务)|
|                |       |                 |       |                |
+----------------+       +-----------------+       +----------------+↑                                              ||                                              ↓|                                   +---------------------++-----------------------------------|     任务处理器       || (执行具体业务逻辑)   |+---------------------+

核心流程

  1. 生产者将任务添加到Redis ZSet中,score为任务执行时间戳

  2. 定时任务定期扫描ZSet,找出score小于当前时间的任务

  3. 消费者线程池处理到期的任务

  4. 任务处理完成后从ZSet中移除

实现步骤

步骤一:添加依赖(pom.xml)

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 定时任务 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok简化代码 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

步骤二:配置Redis(application.yml)

spring:redis:host: localhostport: 6379password: database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 2max-wait: 10000ms# 自定义延迟队列配置
delay:queue:key: "delay_queue"  # Redis ZSet键名batch-size: 10      # 每次处理任务数量interval: 5000      # 定时任务执行间隔(ms)thread-pool-size: 5 # 消费者线程池大小

步骤三:创建任务模型(DelayTask.java)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {/*** 任务类型枚举*/public enum TaskType {ORDER_TIMEOUT,   // 订单超时处理EMAIL_REMINDER,  // 邮件提醒TASK_EXECUTION   // 定时任务执行}private TaskType type;    // 任务类型private String taskId;    // 任务唯一IDprivate String content;   // 任务内容private long createTime;  // 任务创建时间private long executeTime; // 任务执行时间// 重写toString方法用于序列化@Overridepublic String toString() {return "DelayTask{" +"type=" + type +", taskId='" + taskId + '\'' +", content='" + content + '\'' +", createTime=" + createTime +", executeTime=" + executeTime +'}';}
}

步骤四:创建Redis配置类(RedisConfig.java)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用Jackson序列化GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();// Key序列化template.setKeySerializer(new StringRedisSerializer());// Value序列化template.setValueSerializer(jacksonSerializer);// Hash Key序列化template.setHashKeySerializer(new StringRedisSerializer());// Hash Value序列化template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}

步骤五:创建线程池配置类(ThreadPoolConfig.java)

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadPoolConfig {@Value("${delay.queue.thread-pool-size:5}")private int threadPoolSize;@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(threadPoolSize);// 最大线程数executor.setMaxPoolSize(threadPoolSize * 2);// 队列大小executor.setQueueCapacity(100);// 线程名前缀executor.setThreadNamePrefix("delay-task-");// 拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}

步骤六:创建延迟队列服务(DelayQueueService.java)

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.Executor;@Slf4j
@Service
public class DelayQueueService {@Value("${delay.queue.key}")private String delayQueueKey;@Value("${delay.queue.batch-size}")private int batchSize;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate Executor taskExecutor;/*** 添加延迟任务* @param task 任务对象* @param delaySeconds 延迟秒数*/public void addTask(DelayTask task, long delaySeconds) {long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);task.setExecuteTime(executeTime);// 添加到Redis ZSetredisTemplate.opsForZSet().add(delayQueueKey, task, executeTime);log.info("添加延迟任务成功, 任务ID: {}, 执行时间: {}", task.getTaskId(), executeTime);}/*** 定时扫描任务(每5秒执行一次)*/@Scheduled(fixedRateString = "${delay.queue.interval}")public void scanExpiredTasks() {long now = System.currentTimeMillis();log.debug("开始扫描延迟队列, 当前时间: {}", now);// 获取当前时间之前的所有任务Set<ZSetOperations.TypedTuple<Object>> tasks = redisTemplate.opsForZSet().rangeByScoreWithScores(delayQueueKey, 0, now, 0, batchSize);if (tasks == null || tasks.isEmpty()) {log.debug("未找到待处理任务");return;}log.info("发现 {} 个待处理任务", tasks.size());for (ZSetOperations.TypedTuple<Object> tuple : tasks) {Object taskObj = tuple.getValue();if (taskObj instanceof DelayTask) {DelayTask task = (DelayTask) taskObj;// 使用线程池异步处理任务taskExecutor.execute(() -> processTask(task));}}}/*** 处理任务* @param task 延迟任务*/@Asyncpublic void processTask(DelayTask task) {try {log.info("开始处理任务: {}", task.getTaskId());// 根据任务类型执行不同逻辑switch (task.getType()) {case ORDER_TIMEOUT:handleOrderTimeout(task);break;case EMAIL_REMINDER:sendReminderEmail(task);break;case TASK_EXECUTION:executeScheduledTask(task);break;default:log.warn("未知任务类型: {}", task.getType());}// 处理完成后从队列中移除redisTemplate.opsForZSet().remove(delayQueueKey, task);log.info("任务处理完成并移除: {}", task.getTaskId());} catch (Exception e) {log.error("任务处理失败: {}", task.getTaskId(), e);handleProcessingError(task);}}// 示例:订单超时处理private void handleOrderTimeout(DelayTask task) {log.info("处理订单超时任务: {}", task.getContent());// 实际业务逻辑:取消订单、释放库存等// 模拟处理时间try {Thread.sleep(1000);} catch (InterruptedException ignored) {}}// 示例:发送提醒邮件private void sendReminderEmail(DelayTask task) {log.info("发送提醒邮件: {}", task.getContent());// 实际业务逻辑:调用邮件服务发送邮件}// 示例:执行定时任务private void executeScheduledTask(DelayTask task) {log.info("执行定时任务: {}", task.getContent());// 实际业务逻辑:执行定时任务}// 错误处理private void handleProcessingError(DelayTask task) {log.error("任务处理失败,加入死信队列: {}", task.getTaskId());// 可以将失败任务移到死信队列redisTemplate.opsForList().rightPush("delay:dead-letter", task);}
}

步骤七:创建测试Controller(DelayQueueController.java)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/delay")
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;/*** 添加延迟任务* @param type 任务类型 (1-订单超时, 2-邮件提醒, 3-定时任务)* @param seconds 延迟秒数* @param content 任务内容*/@PostMapping("/add")public String addDelayTask(@RequestParam("type") int type,@RequestParam("seconds") long seconds,@RequestParam("content") String content) {// 创建任务IDString taskId = "TASK-" + System.currentTimeMillis();// 转换任务类型DelayTask.TaskType taskType;switch (type) {case 1: taskType = DelayTask.TaskType.ORDER_TIMEOUT; break;case 2: taskType = DelayTask.TaskType.EMAIL_REMINDER; break;case 3: taskType = DelayTask.TaskType.TASK_EXECUTION; break;default: throw new IllegalArgumentException("无效的任务类型");}// 创建任务DelayTask task = new DelayTask(taskType, taskId, content, System.currentTimeMillis(), 0);// 添加任务delayQueueService.addTask(task, seconds);return "任务添加成功! ID: " + taskId;}/*** 查看队列状态*/@GetMapping("/status")public String queueStatus() {long size = delayQueueService.getQueueSize();return "当前延迟队列任务数量: " + size;}
}

启动类(DelayQueueApplication.java)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 启用定时任务
@EnableAsync     // 启用异步方法
public class DelayQueueApplication {public static void main(String[] args) {SpringApplication.run(DelayQueueApplication.class, args);}
}

方案优势与注意事项

优势

  1. 高性能:利用Redis内存操作和ZSet有序特性

  2. 低延迟:定时任务扫描保证任务及时处理

  3. 高可靠:任务处理失败可进入死信队列

  4. 可扩展:线程池支持并行处理多个任务

  5. 灵活配置:支持批量处理大小、扫描间隔等参数配置

注意事项

  1. 任务幂等性:确保任务可重复处理而不产生副作用

  2. 任务超时处理:长时间任务需考虑超时机制

  3. Redis持久化:根据业务需求配置RDB或AOF

  4. 分布式环境:多实例部署时需考虑任务竞争问题

  5. 监控告警:添加队列积压监控和任务失败告警

扩展建议

  1. 添加管理界面

    • 查看队列中的任务

    • 手动重试失败任务

    • 统计任务处理成功率

  2. 分布式锁优化

    // 在scanExpiredTasks方法中
    String lockKey = "delay_queue_lock";
    Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.SECONDS);if (lockAcquired != null && lockAcquired) {try {// 执行扫描任务逻辑} finally {redisTemplate.delete(lockKey);}
    }

  3. 任务优先级支持

    // 在添加任务时,可将优先级加入score计算
    double score = executeTime + (priority * 0.001);
  4. 延迟时间精确控制

    • 使用Redisson的DelayedQueue组件

    • 或使用Redis的Keyspace通知功能

这个实现方案提供了一个完整、可扩展的延迟队列系统,适用于订单超时处理、定时提醒、延迟任务执行等多种业务场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/90062.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/90062.shtml
英文地址,请注明出处:http://en.pswp.cn/web/90062.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MCP vs 传统集成方案:REST API、GraphQL、gRPC的终极对比

MCP vs 传统集成方案&#xff1a;REST API、GraphQL、gRPC的终极对比 &#x1f31f; Hello&#xff0c;我是摘星&#xff01; &#x1f308; 在彩虹般绚烂的技术栈中&#xff0c;我是那个永不停歇的色彩收集者。 &#x1f98b; 每一个优化都是我培育的花朵&#xff0c;每一个特…

SQL语句中锁的使用与优化

一、锁机制简介1.定义在数据库中&#xff0c;除了传统的计算资源&#xff08;如CPU、RAM、I/O等&#xff09;的争用以外&#xff0c;数据也是一种供需要用户共享的资源。如何保证数据并发访问的一致性、有效性是所有数据库必须解决的一个问题&#xff0c;锁冲突也是影响数据库并…

Linux笔记1——简介安装

操作系统给用户一个操作界面&#xff0c;用户通过操作界面使用系统资源Linux内核管理控制硬件&#xff0c;和硬件打交道SCSI&#xff08;盘&#xff09;sd**;第一个*表示磁盘顺序&#xff0c;第二个*表示分区。例如&#xff1a;sda\sdb\sdc,sda1,sda2NVMe&#xff08;盘&#x…

GoLand 部署第一个项目

前言&#xff1a;Go环境部署分为两种模式&#xff0c;一种是基于GOPATH部署&#xff08;老版本&#xff09;&#xff0c;另一种是基于Module部署&#xff08;新版本v1.11开始&#xff09;。GOPATH&#xff1a;需要配置GOPATH路径&#xff0c;将GOPATH目录视为工作目录&#xff…

Mosaic数据增强介绍

1. 核心概念与目标Mosaic 是一种在计算机视觉&#xff08;尤其是目标检测任务&#xff09;中非常流行且强大的数据增强技术。它最早由 Ultralytics 的 Alexey Bochkovskiy 在 YOLOv4 中提出并推广&#xff0c;后来被广泛应用于 YOLOv5, YOLOv7, YOLOv8 等模型以及其他目标检测框…

LINUX 722 逻辑卷快照

逻辑卷快照 lvcreate -L 128M -s -n lv1-snap /dev/vg1/lv1 lvs lvscan mount -o ro /dev/vg1/lv1 /mmt/lv1-snap dmsetup ls --tree 测试 lvs /dev/vg1/lv1-snap dd if/dev/zero of/uc1/test bs1M count40 lvs /dev/vg1/lv1-snap 问题 [rootweb ~]# cd /mnt [rootweb mnt]# m…

Springboot+vue个人健康管理系统的设计与实现

文章目录前言详细视频演示具体实现截图后端框架SpringBoot前端框架Vue持久层框架MyBaits成功系统案例&#xff1a;代码参考数据库源码获取前言 博主介绍:CSDN特邀作者、985高校计算机专业毕业、现任某互联网大厂高级全栈开发工程师、Gitee/掘金/华为云/阿里云/GitHub等平台持续…

数据结构 --栈和队链

一.栈的概念一种特殊的线性表&#xff0c;只能从固定的一端插入和删除元素。栈中元素遵循先进后出的原则。二.模拟实现public class MyStack {public int size;public int[] array;public MyStack(){array new int[10];}private void grow(){array Arrays.copyOf(array,array…

文档处理控件TX Text Control系列教程:使用 C# .NET 将二维码添加到 PDF 文档

PDF 文档通常是合同、发票、证书和报告的最终格式。尽管它们在设计上是静态的&#xff0c;但用户现在希望能够与它们交互、验证信息并直接从这些文件访问数字服务。这时&#xff0c;二维码就变得至关重要。 PDF 文档中的二维码将印刷或数字内容与动态在线体验连接起来。用户只需…

Google Chrome 谷歌浏览器全部版本集合

Google Chrome 谷歌浏览器全部版本集合 Collection of all software versions of Google Chrome. 项目介绍 本项目为Google Chrome谷歌浏览器的全部版本集合&#xff0c;方便大家下载旧版本使用。 因为Gitee项目限制仓库1G大小&#xff0c;所以许多谷歌浏览器版本无法上传。…

论文略读:Towards Safer Large Language Models through Machine Unlearning

ACL 2024大型语言模型&#xff08;LLMs&#xff09;的迅猛发展展现了其在多个领域的巨大潜力&#xff0c;这主要得益于其广泛的预训练知识和出色的泛化能力。然而&#xff0c;当面对问题性提示&#xff08;problematic prompts&#xff09;时&#xff0c;LLMs 仍然容易生成有害…

深度学习 ---参数初始化以及损失函数

深度学习 —参数初始化以及损失函数 文章目录深度学习 ---参数初始化以及损失函数一&#xff0c;参数初始化1.1 固定值初始化1.1.1 全0初始化1.1.2 全1初始化1.3 任意常数初始化1.2 随机初始化一&#xff0c;参数初始化 神经网络的参数初始化是训练深度学习模型的关键步骤之一…

JS--M端事件

移动端&#xff08;Mobile 端&#xff0c;简称 M 端&#xff09;开发中&#xff0c;由于设备特性&#xff08;触摸屏、手势操作等&#xff09;&#xff0c;需要处理一些与桌面端不同的事件。这些事件主要针对触摸交互、手势识别等场景 一、触摸事件&#xff08;Touch Events&am…

Linux网络编程-tcp

tcp、udp对比&#xff1a;UDP1. 特点无连接&#xff1a;无需建立连接即可发送数据。不可靠&#xff1a;不保证数据顺序或完整性。低延迟&#xff1a;适合实时性要求高的场景。2. 应用场景视频/音频流传输&#xff08;如直播&#xff09;。DNS 查询、在线游戏。TCP1. 特点面向连…

记一次flink资源使用优化

一.现状分析 现有任务的资源配置如下&#xff0c;根据ui监控中Garbage Collection可以发现&#xff0c;此任务频繁的发生GC&#xff0c;且老年代GC时间较久二.整体memory使用分析如下Framework Heap&#xff08;框架堆内存&#xff09;用于Flink框架自身的堆内存&#xff08;如…

Vue底层换成啥了?如何更新DOM的?

摘要&#xff1a;之前的vue是使用虚拟 DOM的&#xff0c;但是Vue 3.6 带来了一个意义重大的更新&#xff1a; Vapor Mode 渲染模式。Vue 渲染策略的演进&#xff1a; Vue 1.x&#xff1a; 基于模板渲染策略&#xff0c;直接将模板转换为DOM元素&#xff0c;并为每个DOM元素创建…

0722 数据结构顺序表

Part 1.顺序表的代码一.顺序表的内存申请head.h: typedef int datatype;typedef struct sqlist {//数据元素datatype data[MAXSIZE];//顺序表长度int len;}*sqlist; //*sqlist的作用: //sqlist:struct Sqlist * sqlist create();head.c: sqlist create() {sqlist list (sqlist)…

为何在 Vue 的 v-model 指令中不能使用可选链(Optional Chaining)?

Vue 的 v-model 是实现组件与数据双向绑定的核心指令之一&#xff0c;它本质上是一个语法糖&#xff0c;用于简化对表单元素和组件 props 的同步更新。然而&#xff0c;在 Vue 3&#xff08;以及 Vue 2 的某些模式下&#xff09;&#xff0c;开发者尝试在 v-model 中使用 JavaS…

基于单片机智能药盒/智能药箱/定时吃药系统

传送门 &#x1f449;&#x1f449;&#x1f449;&#x1f449;其他作品题目速选一览表 &#x1f449;&#x1f449;&#x1f449;&#x1f449;其他作品题目功能速览 概述 本设计实现了一种基于单片机的智能药盒&#xff0c;系统以微控制器&#xff08;如STM32&#xff…

(25)python+playwright自动化处理单选和多选按钮-中

1.简介上一篇中讲解和介绍的单选框有点多&#xff0c;而且由于时间的关系&#xff0c;决定今天讲解和分享复选框的相关知识。2.什么是单选框、复选框&#xff1f;单选按钮一般叫raido button&#xff0c;就像我们在电子版的单选答题过程一样&#xff0c;单选只能点击一次&#…