基于redis的stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.oreders

  • 修改之前的秒杀下单Lua脚本,在认定有抢够资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId

  • 项目启动后,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

首先,使用命令行来创建消息队列:

image-20250702162909923

其次,需要修改Lua脚本:

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by 20893.--- DateTime: 2025/6/27 15:48----- 1.参数列表--1.1优惠券IDlocal voucherId = ARGV[1]--1.2用户IDlocal userId = ARGV[2]-- 1.3 订单IDlocal orderId = ARGV[3]--2.数据key--2.1库存keylocal stockKey = 'seckill:stock:' .. voucherId--2.2订单keylocal orderKey = 'seckill:order:' .. voucherId--3.业务逻辑--3.1判断库存是否充足-- redis.call('get',stockKey)中取出的值是String类型,需要将其转化成int类型,调用tonumber()方法if  (tonumber(redis.call('get', stockKey))<= 0) then--3.2. 库存不足 返回1return 1end--3.3判断用户是否重复下单if (redis.call('sismember', orderKey, userId) == 1) then--3.4. 重复下单 返回2return 2end--3.5扣减库存redis.call('incrby', stockKey, -1)--3.6记录订单redis.call('sadd', orderKey, userId)--3.7发送消息到队列中,xadd stream.orders * k1 v1 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0

还需要修改Java业务代码:

 public Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if (result != 0){//2.1.不为0,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//3.返回订单IDreturn Result.ok(orderId);

最终,根据我们的伪代码进行对异步线程中业务流程的修改

while(true){//尝试今天队列,使用阻塞模式,最长等待2000毫秒Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");if(msg == null){//null说明没有消息,继续下一次continue;}try{//处理消息,完成后需要确认消息(ACK)handleMessage(msg);}catch(Exception e){while(true){Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环break;}try{//说明有异常消息,再次处理handleMessage(msg);}catch(Exception e){//再次出现异常,记录日志,继续循环continue;}}}}

代码展示:

 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//该注解表示在类初始化之后执行@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private  class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while ( true){try {//获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());​} catch (Exception e) {log.error("获取订单信息异常",e);handlePendingList();}}​}​private void handlePendingList() {while( true){try {//获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//判断消息获取是否成功if(list == null || list.isEmpty()){//2.1如果获取失败,说明pending-list没有消息,继续下一次循环break;}//2.2.解析消息中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//将map对象转换成订单对象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果获取成功,可以下单handleVoucherOrder(order);//3.ACK确认 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());​} catch (Exception e) {log.error("获取pending-list订单信息异常",e);//如果害怕因为报错导致陷入循环,可以设置休眠时间try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}}

整体业务流程描述:

首先尝试从消息队列中读取数据,如果数据获取失败,直接进行下一次循环,再来读一次消息队列,如果获取成功,说明有订单信息需要处理,就去解析订单信息,完成下单,在进行ack确认。在进行下一次读取,继续循环,如果在处理消息的过程抛出异常,导致该消息没有确认,那么该消息就会进入pending-list,就被catch到,在catch中执行handlePendinglist()函数,在该函数中,首先去pending-list获取未确认消息,如果读到,则解析消息,处理,下单,ack确认,如果没有异常消息,就会直接跳出循环,异常处理结束,如果再抛异常,就再度循环。直到pending-list中所有异常全部处理完成为止。

进行测试:

image-20250702173402911

image-20250702173423837

image-20250702173452042

image-20250702173524978

再次测试:

image-20250702173728568

至此优化秒杀下单的业务需求完成。

希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87693.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87693.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87693.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zephyr RTOS 防止中断影响数据写入

目录 概述 1 中断保护核心策略 1.1 中断锁定/解锁 (IRQ Locking) 1.2 自旋锁 (Spin Locks) 2 高级保护技术 2.1 双重缓冲技术 2.2 RCU (Read-Copy-Update) 模式 3 中断安全数据写入模式 3.1 FIFO队列保护 3.2 原子操作保护 4 性能优化策略 4.1 分区数据保护 4.2 中断…

Hinge×亚矩云手机:以“深度连接”为名,重构云端社交的“真实感”

当传统婚恋社交应用困于“浅层匹配”“硬件性能瓶颈”与“信任成本高企”&#xff0c;当Z世代对“灵魂共鸣、沉浸体验、隐私安全”的需求愈发迫切&#xff0c;以“设计让你删除的应用”为理念的Hinge&#xff0c;正携手亚矩云手机开启一场“云端深度社交革命”——用云端算力破…

OpenSSL 内存泄漏修复全景:119 个历史 Commit 的类型分析与防御启示

1 前言 openssl 开源库作为 C/C 项目中常用的组件库&#xff0c;截至 2025年7月4日 &#xff0c;openssl 的提交记录包含 119 个 Fix memory leak 。 本文基于源码 Commit 分析&#xff0c;揭示了 OpenSSL 内存泄漏修复从被动应对到主动防御的演进趋势&#xff0c;给各位 C/C…

十一、Python 3.13 的新特性和更新内容

1. 性能提升 1.1 解释器性能优化 更快的启动速度&#xff1a;Python 3.13 启动时间比 3.12 快约 10-15%。内存使用优化&#xff1a;减少了内存占用&#xff0c;特别是在处理大型数据结构时。 1.2 字节码优化 新的字节码指令&#xff1a;引入了更高效的字节码指令&#xff0…

后端 Maven打包 JAR 文件、前端打包dist文件、通过后端服务访问前端页面、Nginx安装与部署

打包 JAR 文件通常使用 Maven 或 Gradle 构建工具&#xff08;Spring Boot 项目默认推荐 Maven&#xff09;。以下是详细步骤和常见问题解答&#xff1a; 一、后端 Maven打包 JAR 文件 1. 确保项目是 Spring Boot 项目 项目结构应包含 pom.xml&#xff08;Maven 配置文件&am…

大数据系列 | 日志数据采集工具Filebeat的架构分析及应用

大数据系列 | 日志数据采集工具Filebeat的架构分析及应用 1. Filebeat的由来2. Filebeat原理架构分析3. Filebeat的应用3.1. 安装Filebeat3.2. 实战采集应用程序日志1. Filebeat的由来 在介绍Filebeat之前,先介绍一下Beats。Beats是一个家族的统称,Beats家族有8个成员,早期的…

基于 Vue + RuoYi 架构设计的商城Web/小程序实训课程

以下是基于 Vue RuoYi 架构设计的商城Web/小程序实训课程方案&#xff0c;结合企业级开发需求与教学实践&#xff0c;涵盖全栈技术栈与实战模块&#xff1a; &#x1f4da; 一、课程概述 目标&#xff1a;通过Vue前端 RuoYi后端&#xff08;Spring Boot&#xff09;开发企业…

Puppeteer 相关漏洞-- Google 2025 Sourceless

题目的代码非常简单,核心只有这一句 page.goto(url, { timeout: 2000 });方案1 Puppeteer 是一个常用的自动化浏览器工具&#xff0c;默认支持 Chrome&#xff0c;但也可以配置支持 Firefox。然而&#xff0c;当 Puppeteer 运行在 Firefox 上时&#xff0c;会自动关闭一些安全特…

LucidShape 2024.09 最新

LucidShape的最新版本2024.09带来了一系列新功能与增强功能&#xff0c;旨在解决光学开发者面临的最常见和最复杂的挑战。从微透镜阵列&#xff08;MLA&#xff09;的自动掩模计算&#xff0c;到高级分析功能的改进&#xff0c;LucidShape 2024.09致力于简化工作流程并增强设计…

mini-electron使用方法

把在官方群里“官方132版”目录里下载的包里的minielectron_x64.exe解压到你本地某个目录&#xff0c;改名成electron.exe&#xff0c;比如G:\test\ele_test\mini_electron_pack\electron.exe。 修改你项目的package.json文件。一个例子是&#xff1a; {"name": &q…

Android 网络全栈攻略(七)—— 从 OkHttp 拦截器来看 HTTP 协议二

Android 网络全栈攻略系列文章&#xff1a; Android 网络全栈攻略&#xff08;一&#xff09;—— HTTP 协议基础 Android 网络全栈攻略&#xff08;二&#xff09;—— 编码、加密、哈希、序列化与字符集 Android 网络全栈攻略&#xff08;三&#xff09;—— 登录与授权 Andr…

45-使用scale实现图形缩放

45-使用scale实现图形缩放_哔哩哔哩_bilibili45-使用scale实现图形缩放是一次性学会 Canvas 动画绘图&#xff08;核心精讲50个案例&#xff09;2023最新教程的第46集视频&#xff0c;该合集共计53集&#xff0c;视频收藏或关注UP主&#xff0c;及时了解更多相关视频内容。http…

软件开发早期阶段,使用存储过程的优势探讨:敏捷开发下的利器

在现代软件开发中&#xff0c;随着持续集成与敏捷开发的深入推进&#xff0c;开发团队越来越重视快速响应需求变更、快速上线迭代。在这种背景下&#xff0c;传统将业务逻辑全部放在应用层的方式在某些阶段显得笨重。本文将探讨在软件开发初期&#xff0c;特别是在需求尚不稳定…

『 C++入門到放棄 』- string

C 學習筆記 - string 一、什麼是string ? string 是 C 中標準函數庫中的一個類&#xff0c;其包含在 中 該類封裝了C語言中字符串操作&#xff0c;提供內存管理自動化與更多的操作 支持複製、比較、插入、刪除、查找等功能 二、常用接口整理 類別常用方法 / 說明建立與指…

ARM架构下C++程序堆溢出与栈堆碰撞问题深度解析

ARM架构下C程序堆溢出与栈堆碰撞问题深度解析 一、问题背景&#xff1a;从崩溃现象到内存异常 在嵌入式系统开发中&#xff0c;程序崩溃是常见但棘手的问题。特别是在ARM架构设备上&#xff0c;一种典型的崩溃场景如下&#xff1a;程序在执行聚类算法或大规模数据处理时突然终…

.NET9 实现排序算法(MergeSortTest 和 QuickSortTest)性能测试

在 .NET 9 平台下&#xff0c;我们对两种经典的排序算法 MergeSortTest&#xff08;归并排序&#xff09;和 QuickSortTest&#xff08;快速排序&#xff09;进行了性能基准测试&#xff08;Benchmark&#xff09;&#xff0c;以评估它们在不同数据规模下的执行效率、内存分配及…

RabbitMQ - SpringAMQP及Work模型

一、概述RabbitMQ是一个流行的开源消息代理&#xff0c;支持多种消息传递协议。它通常用于实现异步通信、解耦系统组件和分布式任务处理。Spring AMQP是Spring框架下的一个子项目&#xff0c;提供了对RabbitMQ的便捷访问和操作。本文将详细介绍RabbitMQ的工作模型&#xff08;W…

微信小程序51~60

1.界面交互-loading提示框 loading提示框用于增加用户体验&#xff0c; 对应的API有两个&#xff1a; wx.showLoading()显示loading提示框wx.hideLoading()关闭loading提示框 Page({getData () {//显示loading提示框wx.showLoading({//提示内容不会自动换行&#xff0c;多出来的…

SqueezeBERT:计算机视觉能为自然语言处理在高效神经网络方面带来哪些启示?

摘要 人类每天阅读和撰写数千亿条消息。得益于大规模数据集、高性能计算系统和更优的神经网络模型&#xff0c;自然语言处理&#xff08;NLP&#xff09;技术在理解、校对和组织这些消息方面取得了显著进展。因此&#xff0c;将 NLP 部署于各类应用中&#xff0c;以帮助网页用…

Springboot开发常见注解一览

注解用法常用参数Configuration用于标记类为配置类&#xff0c;其中通过Bean方法定义Spring管理的组件。它替代XML配置&#xff0c;用Java代码声明对象创建逻辑&#xff0c;并确保单例等容器特性生效。相当于给Spring提供一个“制造说明书”来组装应用部件RestControllerRestCo…