📨 Redis Stream:轻量级消息队列深度解析

文章目录

  • 📨 Redis Stream:轻量级消息队列深度解析
  • 🧠 一、Stream 数据结构解析
    • 💡 Stream 核心概念
    • 📋 Stream 底层结构
  • ⚡ 二、消息生产与消费
    • 🚀 消息生产(XADD)
    • 📥 消息消费(XREAD)
    • 🆔 消息ID机制
  • 🔄 三、消费组机制详解
    • 💡 消费组核心概念
    • 🛠️ 消费组管理命令
    • ⚡ 消费组实战示例
  • 📊 四、Redis Stream vs Kafka
    • 📋 特性对比表
    • 🎯 适用场景对比
    • 🔧 技术选型建议
  • 🚀 五、实战应用案例
    • 🛒 案例1:订单处理队列
    • ⏰ 案例2:延迟消息实现
    • 🔄 案例3:消息回溯与重放
  • 💡 六、总结与最佳实践
    • 🎯 适用场景总结
    • 🔧 生产环境建议
    • 🚀 性能优化技巧

🧠 一、Stream 数据结构解析

💡 Stream 核心概念

Redis Stream 是 Redis 5.0 引入的​​持久化消息数据结构​​​​,它提供了完整的消息队列功能,包括消息持久化、消费组、消息确认等特性。

Stream
Entry 1
Entry 2
Entry 3
...
ID: 1640995200000-0
Field1: Value1
Field2: Value2

Stream 核心特性​​:

  • 📝 ​​消息持久化​​:所有消息持久化存储
  • 🔄 ​​消费组支持​​:多个消费组独立消费
  • ✅ ​​消息确认​​:确保消息至少消费一次
  • ⏰ ​​消息回溯​​:支持历史消息重新消费
  • 🚀 ​​高性能​​:基于内存的高吞吐量

📋 Stream 底层结构

​​Stream 内部实现​​:

// Redis Stream 底层结构
typedef struct stream {rax *rax;               // 基数树存储消息uint64_t length;        // 消息数量streamID last_id;       // 最后消息IDrax *cgroups;           // 消费组
} stream;// 消息ID结构
typedef struct streamID {uint64_t ms;            // 时间戳uint64_t seq;           // 序列号
} streamID;

​​消息存储格式​​:

+----------+----------+----------+----------+
| 消息ID    | 字段1     | 字段2     | 字段3     |
+----------+----------+----------+----------+
| 1640995200000-0 | name:张三 | age:25 | city:北京 |
+----------+----------+----------+----------+

⚡ 二、消息生产与消费

🚀 消息生产(XADD)

​​基本消息生产​​:

# 添加消息到流(自动生成ID)
XADD orders * user_id 1001 product_id 2001 quantity 2# 输出:1640995200000-0(生成的消息ID)# 指定消息ID
XADD orders 1640995200000-1 user_id 1002 product_id 2002 quantity 1# 限制Stream长度(近似修剪)
XADD orders MAXLEN ~ 1000 * user_id 1003 product_id 2003 quantity 3

​​Java 生产者示例​​:

public class StreamProducer {private Jedis jedis;public String produceMessage(String streamKey, Map<String, String> message) {// 自动生成消息IDreturn jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, message);}public String produceMessageWithId(String streamKey, String id, Map<String, String> message) {// 指定消息IDreturn jedis.xadd(streamKey, new StreamEntryID(id), message);}
}

📥 消息消费(XREAD)

​​独立消费者模式​​:

# 读取所有可用消息
XREAD STREAMS orders 0# 从指定ID开始读取
XREAD STREAMS orders 1640995200000-0# 阻塞读取新消息(最多等待5000ms)
XREAD BLOCK 5000 STREAMS orders $# 批量读取多条消息
XREAD COUNT 10 STREAMS orders 0

​​消费者组示例​​:

public class StreamConsumer {private Jedis jedis;public void consumeMessages(String streamKey, String consumerGroup, String consumerName) {while (true) {// 阻塞读取消息List<Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(consumerGroup, consumerName, XReadGroupParams.xReadGroupParams().block(5000),Collections.singletonMap(streamKey, StreamEntryID.UNRECEIVED_ENTRY));for (StreamEntry entry : messages.get(0).getValue()) {processMessage(entry);// 确认消息处理完成jedis.xack(streamKey, consumerGroup, entry.getID());}}}private void processMessage(StreamEntry entry) {Map<String, String> fields = entry.getFields();System.out.println("处理消息: " + fields);}
}

🆔 消息ID机制

​​ID生成策略​​:

# 时间戳-序列号格式
1640995200000-0  # 2022年1月1日 00:00:00 的第0条消息
1640995201000-0  # 1秒后的第0条消息
1640995201000-1  # 同一毫秒的第1条消息# 特殊ID含义
0-0              # 从开始读取
$                # 只读取新消息

​​ID操作示例​​:

# 查询消息范围
XRANGE orders 1640995200000-0 1640995201000-0# 反向查询
XREVRANGE orders + - COUNT 10# 删除特定消息
XDEL orders 1640995200000-0

🔄 三、消费组机制详解

💡 消费组核心概念

Stream
Consumer Group
Consumer 1
Consumer 2
Consumer 3
Message 1
Message 4
Message 2
Message 5
Message 3
Message 6

🛠️ 消费组管理命令

​​创建消费组​​:

# 创建消费组,从Stream开头消费
XGROUP CREATE orders order-group 0# 创建消费组,只消费新消息
XGROUP CREATE orders order-group $# 删除消费组
XGROUP DESTROY orders order-group# 查看Stream信息
XINFO STREAM orders# 查看消费组信息
XINFO GROUPS orders# 查看消费者信息
XINFO CONSUMERS orders order-group

​​消费者操作​​:

# 从消费组读取消息
XREADGROUP GROUP order-group consumer1 COUNT 10 STREAMS orders ># 确认消息处理
XACK orders order-group 1640995200000-0# 查看待处理消息
XPENDING orders order-group# 认领超时消息
XCLAIM orders order-group consumer2 3600000 1640995200000-0

⚡ 消费组实战示例

​​Java 消费组实现​​:

public class ConsumerGroupExample {public void startConsumer(String stream, String group, String consumer) {// 初始化消费组initConsumerGroup(stream, group);while (true) {try {// 读取消息List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().block(1000),Collections.singletonMap(stream, ">"));for (StreamEntry entry : messages.get(0).getValue()) {processMessage(entry);// 确认消息jedis.xack(stream, group, entry.getID());}} catch (Exception e) {handleError(e);}}}private void initConsumerGroup(String stream, String group) {try {jedis.xgroupCreate(stream, group, null, false);} catch (Exception e) {// 消费组可能已存在System.out.println("消费组已存在: " + e.getMessage());}}
}

📊 四、Redis Stream vs Kafka

📋 特性对比表

特性Redis StreamApache Kafka优势方
部署复杂度极简(内置Redis)复杂(需要ZooKeeper)Redis
消息持久化支持(但受内存限制)支持(磁盘持久化)Kafka
吞吐量极高(10万+/秒)高(10万+/秒)相当
延迟极低(亚毫秒级)低(毫秒级)Redis
消息保留基于内存限制基于时间和大小Kafka
消费组支持支持相当
分区支持有限(单个Stream)完整支持Kafka
生态工具较少丰富Kafka
适用规模中小规模(GB级)大规模(TB级)Kafka

🎯 适用场景对比

低延迟/简单部署
大规模/高持久化
中等规模/平衡需求
消息场景需求
需求分析
选择Redis Stream
选择Kafka
根据团队熟悉度选择
实时通知
聊天应用
任务队列
日志收集
事件溯源
流处理

🔧 技术选型建议

​​选择 Redis Stream 当​​:

  • ✅ 需要极低的消息延迟
  • ✅ 希望简单部署和维护
  • ✅ 数据量在内存可容纳范围内
  • ✅ 已经使用 Redis 基础设施

​​选择 Kafka 当​​:

  • ✅ 需要处理海量消息(TB级别)
  • ✅ 需要长时间消息保留
  • ✅ 需要强大的流处理生态
  • ✅ 有专业的运维团队

🚀 五、实战应用案例

🛒 案例1:订单处理队列

​​订单队列架构​​:

XADD
减库存
处理支付
发送通知
订单服务
orders_stream
订单消费组
库存服务
支付服务
通知服务
库存DB
支付DB
消息服务

​​订单生产者​​:

public class OrderProducer {public void createOrder(Order order) {Map<String, String> fields = new HashMap<>();fields.put("order_id", order.getId());fields.put("user_id", order.getUserId());fields.put("amount", order.getAmount().toString());fields.put("status", "created");String messageId = jedis.xadd("orders_stream", StreamEntryID.NEW_ENTRY, fields);log.info("订单消息已发送: {}", messageId);}
}

​​库存消费者​​:

public class InventoryConsumer {public void processOrders() {while (true) {List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup("order_group", "inventory_consumer", XReadGroupParams.xReadGroupParams().block(1000),Collections.singletonMap("orders_stream", ">"));for (StreamEntry entry : messages.get(0).getValue()) {try {reduceInventory(entry.getFields());jedis.xack("orders_stream", "order_group", entry.getID());} catch (Exception e) {log.error("处理库存失败: {}", entry.getID(), e);}}}}
}

⏰ 案例2:延迟消息实现

​​延迟消息方案​​:

XADD
定时检查
XADD
生产者
延迟队列
等待
消费者
就绪队列
真实消费者

​​延迟消息实现​​:

public class DelayedMessageService {private ScheduledExecutorService scheduler;public void sendDelayedMessage(String stream, Map<String, String> message, long delayMs) {// 存储到延迟队列String id = jedis.xadd("delayed:" + stream, StreamEntryID.NEW_ENTRY, message);// 定时任务处理延迟scheduler.schedule(() -> {// 从延迟队列移动到就绪队列moveToReadyQueue(stream, id);}, delayMs, TimeUnit.MILLISECONDS);}private void moveToReadyQueue(String stream, String messageId) {// 读取延迟消息List<StreamEntry> entries = jedis.xrange("delayed:" + stream, messageId, messageId);if (!entries.isEmpty()) {StreamEntry entry = entries.get(0);// 添加到就绪队列jedis.xadd(stream, StreamEntryID.NEW_ENTRY, entry.getFields());// 删除延迟消息jedis.xdel("delayed:" + stream, messageId);}}
}

🔄 案例3:消息回溯与重放

​​消息重放服务​​:

public MessageReplayService {public void replayMessages(String stream, String startId, String endId) {// 创建临时消费组用于重放String replayGroup = "replay_" + System.currentTimeMillis();jedis.xgroupCreate(stream, replayGroup, new StreamEntryID(startId), false);// 重放消息List<StreamEntry> messages = jedis.xrange(stream, startId, endId);for (StreamEntry message : messages) {processReplayMessage(message.getFields());}// 清理临时消费组jedis.xgroupDestroy(stream, replayGroup);}
}

💡 六、总结与最佳实践

🎯 适用场景总结

​​适合使用 Redis Stream​​:

场景推荐度理由
实时通知系统✅✅✅低延迟,简单易用
任务队列✅✅✅持久化,消费组支持
聊天消息✅✅✅时序消息,快速存取
事件溯源✅✅消息回溯能力
日志收集中小规模日志
金融交易需要更强持久化保证

🔧 生产环境建议

​​配置优化​​:

# redis.conf 优化配置
maxmemory 4gb
maxmemory-policy allkeys-lru
stream-node-max-bytes 4096
stream-node-max-entries 100# 监控配置
slowlog-log-slower-than 10000
latency-monitor-threshold 100

​​监控指标​​:

# 监控Stream状态
redis-cli xinfo stream orders_stream# 监控消费组
redis-cli xinfo groups orders_stream# 监控内存使用
redis-cli info memory | grep used_memory_stream# 监控消息积压
redis-cli xlen orders_stream

​​故障处理​​:

# 处理消息积压
# 1. 增加消费者数量
# 2. 调整消费组参数
# 3. 清理过期消息# 处理内存不足
# 1. 设置Stream最大长度
# 2. 启用Stream修剪
# 3. 监控内存使用

🚀 性能优化技巧

​​1. 批量处理​​:

// 批量读取消息
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(group, consumer, XReadGroupParams.xReadGroupParams().count(100).block(1000),Collections.singletonMap(stream, ">"));// 批量确认消息
for (StreamEntry entry : messages) {pendingAck.add(entry.getID());
}
jedis.xack(stream, group, pendingAck.toArray(new StreamEntryID[0]));

​​2. 内存优化​​:

# 定期修剪Stream
XADD orders MAXLEN ~ 10000 * field1 value1# 监控大Stream
redis-cli memory usage orders_stream

​​3. 消费者优化​​:

// 消费者负载均衡
public class ConsumerBalancer {public static String getConsumerName(String serviceId) {return "consumer_" + serviceId + "_" + ThreadLocalRandom.current().nextInt(1000);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/96177.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/96177.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/96177.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android studio的adb和终端的adb互相抢占端口

在Android Studio调试时&#xff0c;有时候也需要借助终端的adb命令&#xff0c;他们互相抢占端 口&#xff0c;导致调试麻烦解决如下&#xff1a;① 终端adb的版本是&#xff1a;1.0.39路径是:/usr/lib/android-sdk/platform-tools/adb② Android Studio使用的adb来源于Androi…

GEO服务商推荐:移山科技以划时代高精尖技术引领AI搜索优化新纪元

引言&#xff1a;AI搜索生态重塑与GEO优化战略地位跃升AI技术对信息检索范式的颠覆GEO优化在企业增长中的核心作用第一章&#xff1a;AI搜索新纪元的企业营销挑战与机遇生成式AI成为用户主要信息入口的行业趋势企业在AI搜索中的“答案主权”争夺战GEO优化服务商的核心能力模型&…

Android SystemServer 系列专题【AttentionManagerService】

AttentionManagerService是framework中用来实现屏幕感知的一个系统级服务&#xff0c;他继承于systemserver。我们可以通过dumpsys attention来获取他的一些信息。如下针对屏幕感知的功能的引入来针对这个服务进行一个介绍。1、屏幕感知Settings UI实现屏幕感知的功能在A14上面…

nginx 反向代理使用变量的坑

nginx采用反向代理的时候使用变量的坑 正常情况&#xff1a; location ~ ^/prod-api(?<rest>/.*)?$ {# 假设 $mes_backend 形如: http://127.0.0.1:16889proxy_pass $mes_backend$rest$is_args$args;proxy_http_version 1.1;proxy_set_header Host $host;…

Origin绘制径向条形图|科研论文图表教程

数据排列格式截图&#xff0c;请查看每张图↘右下角水印 目录 数据排列格式截图&#xff0c;请查看每张图↘右下角水印 本 期 导 读 No.1 理解图形 1 定义 2 特点 3 适用场景 No.2 画图教程 1 导入数据&#xff0c;绘制图形 2 设置绘图细节 本 期 导 读 径…

MySQL InnoDB 的 MVCC 机制

前言 多版本并发控制&#xff08;MVCC&#xff09;是 MySQL InnoDB 存储引擎实现高性能事务的核心机制。它通过创建数据快照&#xff0c;使得读写操作可以无锁并发&#xff0c;极大地提升了数据库的并发性能。本文将深入探讨 MVCC 的工作原理、实现细节以及它与事务隔离级别的紧…

景区负氧离子气象站:引领绿色旅游,畅吸清新每一刻

在绿色旅游成为消费主流的今天&#xff0c;游客对 “清新空气” 的需求不再是模糊的期待&#xff0c;而是可感知、可选择的具体体验。景区负氧离子气象站的出现&#xff0c;正以科技之力重塑绿色旅游格局&#xff0c;让 “畅吸清新每一刻” 从口号变为触手可及的现实&#xff0…

Pytorch笔记一之 cpu模型保存、加载与推理

Pytorch笔记一之 cpu模型保存、加载与推理 1.保存模型 首先&#xff0c;在加载模型之前&#xff0c;我们需要了解如何保存模型。PyTorch 提供了两种保存模型的方法&#xff1a;保存整个模型和仅保存模型的状态字典&#xff08;state dict&#xff09;。推荐使用第二种方式&…

当AI在代码车间组装模块:初级开发者的创意反成「核心算法」

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录当AI在…

技术视界 | 跨域机器人通信与智能系统:打破壁垒的开源探索

8 月 16 日&#xff0c;在 OpenLoong 社区举办的第九期线下分享会上&#xff0c;国家地方共建人形机器人创新中心的软件开发负责人 Amadeus 博士带来了一场主题为“跨域机器人通信与智能系统&#xff1a;打破行业壁垒的创新方案”的演讲。深入探讨了当前机器人领域的一个关键痛…

Android入门到实战(八):从发现页到详情页——跳转、传值与RecyclerView多类型布局

一. 引言在上一篇文章里&#xff0c;我们从零开始实现了 App 的 发现页面&#xff0c;通过网络请求获取数据&#xff0c;并使用 RecyclerView 展示了剧集列表。但光有发现页还不够&#xff0c;用户在点击一部剧时&#xff0c;自然希望进入到一个更详细的页面&#xff0c;去查看…

【工具】41K star!网页一键变桌面应用

项目中遇到了一个需要将现有的 web 页面打包成一个 桌面应用 的需求。 最一开始想到的是 Electron&#xff0c;但是它还需要一些开发工作并且打包后的应用体积比较大&#xff0c;调研后发现了开源工具 Pake。 它能让你用最轻量的方式&#xff0c;把任何网页一键打包成跨平台桌…

浪潮CD1000-移动云电脑-RK3528芯片-2+32G-安卓9-2种开启ADB ROOT刷机教程方法

浪潮CD1000-移动云电脑-RK3528芯片-232G-安卓9-2种开启ADB ROOT刷机教程方法 往期文章&#xff1a; 浪潮CD1000-移动云电脑-RK3528芯片-232G-安卓9-开启ADB ROOT破解教程 地址1&#xff1a;浪潮CD1000-移动云电脑-RK3528芯片-232G-开启ADB ROOT破解教程-CSDN博客 中国移动浪潮…

Day23_【机器学习—聚类算法—K-Means聚类 及评估指标SSE、SC、CH】

一、聚类算法概念属于无监督学习算法&#xff0c;即有特征无标签&#xff0c;根据样本之间的相似性&#xff0c;将样本划分到不同的类别中。所谓相似性可以理解为欧氏距离、曼哈顿距离、切比雪夫距离... 。分类按颗粒度分为&#xff1a;粗聚类、细聚类。按实现方法分为&#xf…

android seekbar显示刻度

SeekBar简介 SeekBar是Android中的一个可交互UI组件&#xff0c;允许用户通过拖动滑块在特定范围内选择数值。继承自ProgressBar&#xff0c;但增加了用户手动调节功能&#xff0c;常用于音量控制、亮度调节等场景。 核心属性 android:maxHeight // 背景高度 android:progres…

【高并发内存池】五、页缓存的设计

文章目录Ⅰ. page cache页缓存的结构设计Ⅱ. 完善central cache中的 get_span() 函数Ⅲ. 实现页缓存获取span对象的接口Ⅰ. page cache页缓存的结构设计 ​ 首先页缓存还是一个哈希桶的结构&#xff0c;但是和前两者不同的是&#xff0c;页缓存的哈希桶中存放的是一个或者多个…

Elasticsearch(text和keyword)区别分析

text:全文检索类型,经过分词处理,支持模糊匹配‌ keyword:精确匹配类型,适用于聚合、排序和过滤‌ text 1. 核心属性 ‌analyzer属性‌: 指定用于索引和搜索的分词器 默认使用标准分析器(Standard Analyzer) 示例:"analyzer": "ik_max_word"(中文…

通过tailscale实现一台电脑上vscode通过ssh连接另一台电脑上的VMware Linux 虚拟机

当需要通过一台windows电脑上的vscode来ssh连接另一台电脑上的linux虚拟机进行远程操作&#xff0c;可以通过tailscale来实现。 Linux虚拟机上安装tailscale 由于挂代理下载仍然很慢&#xff0c;而清华镜像源又没有tailscale的软件包&#xff0c;所以可以通过下载 DEB 包安装…

[Upscayl图像增强] docs | 前端 | Electron工具(web->app)

链接&#xff1a;https://upscayl.org/docs&#xff1a;Upscayl Upscayl是一款桌面应用程序&#xff0c;允许用户使用人工智能放大和增强图像。 提供了一个用户友好的图形界面&#xff08;渲染器用户界面&#xff09;&#xff0c;用户可以选择图像或文件夹&#xff0c;从多种AI…

阿里云通义MoE全局均衡技术:突破专家负载失衡的革新之道

MoE模型的基本原理与核心价值 混合专家模型&#xff08;Mixture of Experts&#xff0c;MoE&#xff09;是当前AI大模型领域最重要的架构创新之一&#xff0c;其核心思想是通过多个“专家”网络协同处理输入数据&#xff0c;并由门控网络动态选择或组合各个专家的输出&#xf…