本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别
文章目录
- 概述
- 一、生产者端操作
- 二、消费者端操作
- 三、消费组操作
- 四、状态查询操作
- 五、确认消息
- 六、消息队列的选择
概述
Stream是Redis5.0推出的支持多播的可持久化的消息队列
:
图片来源:图灵学院
如上图所示,Stream依旧是key,value的形式,key对应的是队列的名称,而value的结构则是上图的链表,其主要的结构:
- ID:每条消息都有一个唯一的ID,如果没有指定,则使用Redis自带的生成策略,格式为
当前的毫秒级别时间戳-该毫秒时间点内的消息序号
,是单调递增的。 - Consumer Group:消费组,一个Stream中可以包含多个消费组,而每个消费组又由多个消费者组成。每个消费组是互相独立的,共同消费队列中的消息。
- last_delivered_id:是一个游标,表示当前消费组已经消费到哪条消息了。同一个消费组中的任何一个消费者读取了消息都会使last_delivered_id往前移动。
- Consumer:消费者,同一个消费组中的消费者是竞争关系,并能在组内由唯一的名称。
- pending_ids[]:用于记录当前客户端已经读取,但是尚未ACK的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多。是Stream的ACK机制的实现,保证消息的可靠投递。
- Message Content:消息内容,和其他类型的存储格式类似,都是key-value的格式。
一、生产者端操作
向队列中添加一条消息,会返回消息的ID,队列不存在则会创建。
- xadd stream:创建队列。
- streamtest1:队列的名称,前缀需要加上stream。
- *:自动生成队列中消息的ID 一定是单调有序自增的。
127.0.0.1:6381> xadd streamtest1 * name zhangsan age 18
"1751714750056-0"
查看队列的长度:
127.0.0.1:6381> xlen streamtest1
xrange streamtest1 - + 查看队列中所有的元素:
- xrange:关键字
- streamtest1 :队列名称
- -:表示查询范围的最小值,可以指定消息ID
- +:表示查询范围的最大值,可以指定消息ID
1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
2) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
3) 1) "1751715003212-0"2) 1) "name"2) "wanger"3) "age"4) "25"
xdel 删除指定ID的消息。
127.0.0.1:6381> xdel streamtest1 1751715003212-0
(integer) 1
二、消费者端操作
从队列中读取消息:
- xread:从队列读取消息
- count:读取消息的条数
- streams:关键字
- streamtest1:队列名称
- 0-0:指定读取消息的范围,可以指定ID
127.0.0.1:6381> xread count 1 streams streamtest1 0-0
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
可以指定读取的范围:
127.0.0.1:6381> xread count 1 streams streamtest1 1751714750056-0
1) 1) "streamtest1"2) 1) 1) "1751714996108-0"2) 1) "name"2) "lisi"3) "age"4) "20"
从队列的尾部读取数据,加$,但是默认不返回数据:
127.0.0.1:6381> xread count 1 streams streamtest1 $
(nil)
需要配合使用block阻塞 + 另一个生产者写入一个新消息,:
生产者发送消息:
127.0.0.1:6381> xadd streamtest1 * name zhaoliu age 17
"1751716196234-0"
消费者阻塞等待生产者发送消息:
127.0.0.1:6381> xread block 0 count 1 streams streamtest1 $
1) 1) "streamtest1"2) 1) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
(44.14s)
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息
ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的
消息。
三、消费组操作
创建消费者群组:
- XGROUP create:创建消费者群组
- streamtest1:队列名称
- group1:群组名称
- 0-0:从消息队列头部进行消费,如果是$ 从尾部消费
127.0.0.1:6381> XGROUP create streamtest1 group1 0-0
OK
127.0.0.1:6381> XGROUP create streamtest1 group2 $
OK
消费者群组读取消息,同样支持阻塞读取
- XREADGROUP group:关键字,群组从队列读取消息
- group1:群组名称
- g1:具体的消费者名称,群组内唯一
- count:关键字,读取的数量
- streams:关键字
- streamtest1:队列名称
127.0.0.1:6381> XREADGROUP group group1 g1 count 1 streams streamtest1 >
1) 1) "streamtest1"2) 1) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
在读取前,队列中的状态,group1中的consumers和pending都是0。
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"
读取之后,有了一个消费者,并且待确认的数量为1。
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name"2) "group1"3) "consumers"4) (integer) 15) "pending"6) (integer) 17) "last-delivered-id"8) "1751714750056-0"
四、状态查询操作
查询队列的状态:
- XINFO stream:关键字
- streamtest1:队列的名称
127.0.0.1:6381> XINFO stream streamtest11) "length" # 长度2) (integer) 33) "radix-tree-keys" 4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups" # 队列中群组个数8) (integer) 29) "last-generated-id" # 最后生成消息的ID
10) "1751716196234-0"
11) "first-entry" # 队列中第一个元素
12) 1) "1751714750056-0"2) 1) "name"2) "zhangsan"3) "age"4) "18"
13) "last-entry" # 队列中最后一个元素
14) 1) "1751716196234-0"2) 1) "name"2) "zhaoliu"3) "age"4) "17"
查询消费者组的信息:
- XINFO groups:关键字
- streamtest1:队列的名称
127.0.0.1:6381> XINFO groups streamtest1
1) 1) "name" # 群组名称2) "group1"3) "consumers" # 消费者4) (integer) 05) "pending" # 待消费数量6) (integer) 07) "last-delivered-id" 8) "0-0"
2) 1) "name"2) "group2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1751716196234-0"
查询某个消费者组中消费者的信息:
127.0.0.1:6381> XINFO consumers streamtest1 group1
1) 1) "name"2) "g1"3) "pending" #代表查询了消息,但是没有确认的数量4) (integer) 15) "idle"6) (integer) 398240
五、确认消息
当我使用group1消费者组中的g1消费者对streamtest1队列中的元素进行消费时,会返回一个Id和具体的元素信息:
此时我还没有手动ack,查询消费者的信息,pending为1代表待确认。
队列中所有的元素:
确认消息的命令:
- XACK:消息确认
- streamtest1:队列名称
- group1:分组名称
- 1751714996108-0:待确认的消息的ID
127.0.0.1:6381> XACK streamtest1 group1 1751714996108-0
(integer) 1
再去查询消费者的信息,发现pending变成了0:
再次获取消息,获取的是已消费元素的下一个消息
,当队列中所有的消息都ack之后,再次尝试获取消息,获取到的是nil。
消息的ack,仅仅是将消息从 消费者组的 Pending Entries List中移除,消息仍保留在 Stream 主体中,直到被主动删除。
六、消息队列的选择
基于Redis实现消息队列通常有以下的方式:
- List结构的lpush + brpop。
- PUB/SUB模式。
- Stream消息队列。
如果一定需要使用Redis实现消息队列的功能,推荐使用Stream实现。前两者都有比较明显的弊端:
- lpush + brpop的方案,如果线程一直阻塞,超过了一定的时间,客户端会断开连接,那么执行POP命令的线程就会抛出异常。并且消息的消费是点到点的,不支持分组消费,以及广播模式,重复消费。
- PUB/SUB模式的方案,如果发布者发送消息时,订阅者不在线,那么这条消息就会丢失。并且无法存储消息。Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
使用Redis stream的注意点:
- Stream的消息过多怎么办?限制队列的长度,xadd 可以指定参数maxLen 防止队列爆满。
- 消费者没有对消息ack怎么办?消费组中的消费者有一个pending_ids的集合,没有ack,这个集合会越变越长。尽可能快速消费并ack。
- 出现死信问题怎么办?某个消息任意消费者消费都会出现异常,无法ack。通过xpending查询投递次数,超过一定的次数就认为是死信,执行xdel命令删除消息。
- 如何保证高可用?集群部署Redis,或者使用主从 + 哨兵迷失
- 如何进行消息分区?自己通过一致性hash算法实现。