文章目录
- 1. 通知类型
- 2. 实现原理
- 2.1 Pub/Sub
- 2.1.1 基础知识点
- 2.1.2 频道和订阅者的存储通知原理
- 2.1.3 键空间通知
- 2.1.4 客户端消费
- 2.1.5 缺陷
- 2.2 Redis Stream
- 2.2.1 基础知识点
- 2.2.2 基础数据结构
- 2.2.3 消费者组管理
- 2.2.4 消息和消费者持久化
- 2.2.5 消息生产和消费
- 2.2.6 消费者拉取消息
- 2.2.7 消息分配
- 2.2.8 底层结构体
- 3. 使用示例
- 3.1 maven
- 3.2 yml文件
- 3.3 Configuration配置类
- 3.4 消费者
- 3.5 生产者
1. 通知类型
Redis内置了两种不同的订阅发布机制:
- 基于Pub/sub发布订阅的键通知机制:将订阅者以链表形式缓存在内存中,当key发生改动后将消息放入内存缓冲区,并由服务端逐个主动通知推送给订阅者
- 基于Redis Stream生产消费者通知机制:分为stream、消费者组和消费者,往stream中投递消息,将会分发给消费者组,组中消费者定期拉取组中消息消费
先说总结:相比使用Pub/Sub机制,更推荐使用redis stream代替,但还是分别介绍下Pub/Sub和stream机制。
2. 实现原理
2.1 Pub/Sub
2.1.1 基础知识点
Redis自带的发布订阅模式,分为频道和订阅链表。
频道可以精确订阅,也可以模糊订阅;订阅链表则记录了该频道的所有订阅客户端。
2.1.2 频道和订阅者的存储通知原理
频道和订阅链表是以字典的方式存储在内存中的,不会持久化。其中频道分为普通频道和键空间频道,其中普通频道通过往频道发送消息,从而分发给各个订阅者;而键空间频道是监听具体的key或事件,比如某个key值发生了修改,redis自动使用Pub/Sub机制发送给该键空间频道的订阅者。
当Redis往频道发送了消息,Redis会根据频道字典找出对应的订阅者,并主动推送给对应的客户端。该操作是以各机器节点为单元的,各个机器节点的频道订阅关系是互相隔离的,不会互相同步,因此如果频道A是在主节点上,频道B是在从节点上,想要监听到两个不同频道,就需要客户端和两个节点都建立通信关系。
Redis给不同客户端订阅者发送消息时,会先将消息写入缓冲区,再由Reactor的事件循环机制异步推送给客户端的socket。
2.1.3 键空间通知
键空间频道通知分为两种:监听key值的改动,如某个key被重新赋值,则会触发频道含有key,value值是set的通知;监听事件的触发,如某个key被重新赋值,则会触发频道含有set,value值是key的通知。
Redis集群的key会根据slot分布在不同的节点上,而键空间通知又是基于Pub/Sub机制的,因此如果想要监听到所有的键改动,就需要监听所有的节点。
2.1.4 客户端消费
客户端接收到服务端的通知后是阻塞处理的,因此最好使用异步线程处理业务逻辑,以提高通信效率。
2.1.5 缺陷
Pub/Sub的频道订阅者关系字段都是存储在内存中的,不会持久化,因此消息和订阅关系容易丢失。
Pub/Sub核心是广播模式,发送给订阅者后,无需订阅者ack。
如果频道的数据量过大,订阅者对应的缓冲区空间不够时,会导致客户端被动断开,且该模式无法重新消费历史消息,因此会丢失消息。
2.2 Redis Stream
2.2.1 基础知识点
Redis实现的持久化消息队列(MQ),分为生产者和消费者角色。
支持单消费者和消费者组模式,单消费者模式一般不推荐使用,不便于扩展,推荐使用消费者组模式。
生产者发消息前需要创建stream key和消费者组,消费者组内一个消息只会分发给一个消费者。
如果stream key对应了多个消费者组,则消息会发送给每个消费者组。
每条消息都有消息ID,消息支持持久化,通过RDB/AOF持久化到磁盘,消息ID为时间戳+递增值。
2.2.2 基础数据结构
消息ID使用Radix Tree实现,本质是前缀树,会进行前缀压缩以降低空间占用率,支持范围查询。
树节点使用Listpack存储消息键值对,分为总字节数、元素数量、节点内容和结束符,其中总字节数4字节24位,元素数量2字节16位,每个Listpack存储默认不超过4K,entry数默认不超过100条。
消息ID由时间戳-自增序号
组成,同一时间戳有多条则按接收顺序依次自增序号。
处理流程:
- 消息写入:生成消息ID,消息内容键值对存入对应节点的Listpack;
- 消息读取:根据消息ID前缀定位Listpack,遍历Listpack获取具体消息内容
同一时刻生产者发送了多条消息内容的存储格式:
- tree node:100000001- total- size- entry:100000001-0- entry:json1- entry:100000001-1- entry:value1- ...
- tree node:100000002- total- size- entry:100000002-0- entry:json2- entry:100000002-1- entry:value2- ...
其设计思想核心在于减少空间占用率。
2.2.3 消费者组管理
同一个stream可以关联多个消费者组,以消费者组名称为key,消费者组信息体为value,存储在Radix Tree中。
消费者组信息体中包含了最后一次消费点位,该组中待确认消息字典以及消费者字典信息。
待确认消息里面包含了消息分配时间、消息投递次数及对应的消费者信息,当消费者ack成功后将会删除。
待确认消息在消费者组内保存了一份,消费该消息的消费者也保存了一份。消费前消息只存储在基础的Radix树中,消费后将会在消费者组内和消费者中添加一条待确认消息,两者消息的内容是同步的,消费者的未确认消息是消费者组的子集。
消费者字典的key是消费者名称,value是消费者的信息体,其包含了最后活跃时间和该消费者私有的待确认消息集合。
消费者在消费者组内第一次拉取消息时就会被记录保存,消费者创建后即使长时间不再消费也不会被自动删除,只能通过显式删除消费者组对应的消费者或整个消费者组被删除才会同步删除。
2.2.4 消息和消费者持久化
和key的持久化一样,都使用了RDB/AOF机制来持久化消息和消费者。
- RDB周期性的扫描消息树和消费者组、消费者字典快照并序列化为二进制数据写入RDB文件中;
- AOF则会根据消息新增、创建消费者组和消费者这些命令来追加到AOF文件中。
一般使用RDB定期备份+AOF每秒保存的组合来确保恢复速度和数据安全。
2.2.5 消息生产和消费
在生产者发送消息时,需要确保redis中包含了该stream,而消费者在订阅消费者组中消息时,则需要确保stream下已创建了消费者组。
当生产者往stream发送消息时,会生成消息ID,消息会被直接保存在Radix Tree中作为原始数据。当消费者组要消费时再由Redis根据最后消费点位和读取方式分配消息给消费者。
创建消费者组时可指定该消费者组的起始点:
- 从头消费(0):创建消费者组时获取stream中消息ID最小的消息作为最后消费点位,后续消费者将会从该点位依次往后消费;
- 指定时间戳:可指定从哪个时间戳开始消费后续消息,若该时间戳没有消费,则从最接近的第一条开始消费;
- 仅消费最新的($和>):创建消费者组时获取stream中消息ID最大的消息作为最后消费点,后续消费者将只消费最新消息。
创建消费者组时$和>指令等价,$代表最后消费点位设置为stream的最大消费ID;而>代表从下一条未消费的消息开始读取。
消费者组创建后,$和>不等价,消费者组创建后会因为消费者断连导致很多消息未消费,>指令将会把未消费点位到最新消息点位都消费一遍,而$则会消费最大消息ID之后的消息,消费后更新最后消费点位,仅消费最新的。
组内的最后消费id仅在分配给消费者组消息时才更新。
2.2.6 消费者拉取消息
在Redis stream中,消费者消费消息是通过消费者主动拉取的。
共有两种拉取方式:阻塞式和非阻塞式拉取
- 阻塞式:指定阻塞参数,当阻塞时间为0,则无限期阻塞,保持连接活跃,直到有新消息或连接中断;当阻塞时间>0,则阻塞指定时间,阻塞期间有新消息则返回,超时后返回空值,消费者后续重新拉取;
- 非阻塞式:消费者从消费者组拉取时会立即返,如果有新消息则返回批量消息;如果没有新消息返回空值。
推荐使用的模式:阻塞式拉取,阻塞时间设置为3-5s。
其余模式的缺点:
- 无限期阻塞:会一直保持socket连接,占用Redis和消费者资源,且超过了服务器的连接超时时间也会断开连接;
- 非阻塞式:若客户端控制不好,很容易造成CPU空转消耗资源,也可能导致新消息无法及时处理。
2.2.7 消息分配
消息被保存在stream的Radix树中后,且消费者组里面有最后消费点位,此时多个消费者来拉取消息,消费者组会根据最后消费点位去Radix树中批量获取消息,按轮询分发的方式分配给消费者。
消费者在拉取消息时可配置count参数来指定该批次的大小,在阻塞式/非阻塞式拉取时,只要有消息就会直接返回,不会强制等到满足count时才会返回。只有当消息产生速度远大于消费速度时,消费者才能稳定一次性拉取count数量的消息。
轮询分发:根据消息ID顺序,依次分配给组内的消费者,与消费者的消费批次无关。
消费者指定的批次大小仅影响从服务端拉取的批次,和消息分配时的权重或速度无关。
若某消费者被分配了N条消息,但因不可抗拒因素该消费者一直未消费这些消息,过了idle空闲时间后,这些消息将会被空闲消息,等待转移,若一直未被转移,则一直保存在对应的pel集合中。
2.2.8 底层结构体
Stream在Redis中的基础结构体,rax代表Radix Tree树。
typedef struct stream {rax *rax; // 指向 Radix Tree,存储消息uint64_t length; // 当前消息总数streamID last_id; // 最新消息IDstreamID first_id; // 最早消息IDstreamID max_deleted_entry_id; // 已删除的最大消息IDuint64_t entries_added; // 历史累计消息数(含已删除)rax *cgroups; // 消费组列表(Radix Tree)
} stream;
消费者组,对应stream里面的cgroups指针,cgroups的key是消费者组名称,value是streamCG结构体。其中pel的key是消息ID,value是streamNACK结构体。
typedef struct streamCG {streamID last_id; // 组内最后分发的消息IDlong long entries_read; // 组已读消息数(含已确认)rax *pel; // 组内所有未确认消息(PEL)rax *consumers; // 组内消费者列表(Radix Tree)
} streamCG;
未确认消息结构体,在streamCG和streamConsumer结构体中使用。
typedef struct streamNACK {mstime_t delivery_time; // 最后一次投递时间戳uint64_t delivery_count; // 投递次数(重试计数)streamConsumer *consumer; // 当前持有该消息的消费者
} streamNACK;
消费者实例,分别对应streamCG的consumers树节点和streamNACK的consumer。pel为消费者私有的待确认消息。
typedef struct streamConsumer {mstime_t seen_time; // 最后活跃时间(判断消费者存活)sds name; // 消费者名称(客户端标识)rax *pel; // 消费者私有未确认消息列表
} streamConsumer;
3. 使用示例
鉴于Pub/Sub的诸多缺陷,生产只会考虑最新的Redis stream,如果Redis的版本低于5.0,无法使用MQ实现。
注:例子使用springboot实现,版本要求大于2.2.x
3.1 maven
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.6.RELEASE</version>
</dependency>
3.2 yml文件
spring:redis:host: xxx.xxxport: 6379password: xxxxtimeout: 2000jedis:pool:max-active: 50max-idle: 10min-idle: 3max-wait: 2000
3.3 Configuration配置类
配置RedisTemplate
和StreamListener
的容器StreamMessageListenerContainer
,StreamMessageListenerContainer
主要配置的是阻塞超时时间和count批次大小。
@Configuration
public class RedisConfiguration {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(RedisConnectionFactory factory) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(3)).batchSize(10).targetType(String.class).build();System.out.println("create StreamMessageListenerContainer");return StreamMessageListenerContainer.create(factory, options);}
}
3.4 消费者
使用springboot-redis
提供的订阅消费机制实现,缺点在于批量返回后没办法直接处理该批量,只能一个个处,若需要在StreamListener攒一批处理,需自己实现,或重写StreamMessageListenerContainer
后再重写StreamPollTask
。
@Component
public class StreamConsumer {private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> container;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public static final String STREAM_KEY = "test:dev:stream";public static final String STREAM_GROUP = "test:dev:group";public static final String STREAM_GROUP2 = "test:dev:group2";public StreamConsumer(StreamMessageListenerContainer<String, ObjectRecord<String, String>> container) {this.container = container;}@PostConstructpublic void start() {container.start();System.out.println("start receive message");subscribe(STREAM_GROUP, ReadOffset.lastConsumed());subscribe(STREAM_GROUP2, ReadOffset.lastConsumed());}private void subscribe(String groupName, ReadOffset readOffset) {try {createStream();container.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(STREAM_KEY, readOffset)).consumer(Consumer.from(groupName, "streamConsumer")).autoAcknowledge(false).cancelOnError(t -> false).errorHandler(ex -> {System.out.println(groupName + " poll fail." + ex.getMessage());}).build(),message -> {try {System.out.println(groupName + " get message stream is " + message.getStream() +",value is " + message.getValue());long value = Long.parseLong(message.getValue());if ((value % 100) == 0) {System.out.println(groupName + " batch size is 100,execute business.");}Long result = redisTemplate.opsForStream().acknowledge(STREAM_KEY, STREAM_GROUP,message.getId());System.out.println(groupName + " ack result is " + result);} catch (Exception e) {System.out.println(groupName + " consume message fail");}});} catch (Exception e) {System.out.println(groupName + " subscribe fail");}}public void createStream() {if (Boolean.TRUE.equals(redisTemplate.hasKey(STREAM_KEY))) {return ;}redisTemplate.opsForStream().add(StreamRecords.newRecord().in(STREAM_KEY).ofObject("init"));String result = redisTemplate.opsForStream().createGroup(STREAM_KEY, STREAM_GROUP);System.out.println("create stream result is " + result);result = redisTemplate.opsForStream().createGroup(STREAM_KEY, STREAM_GROUP2);System.out.println("create stream2 result is " + result);}}
3.5 生产者
简单的一个demo,持续向stream中发送消息给消费者消费。
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = {Application.class})
public class RedisServiceTest {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Testpublic void sendMessage() {for (int i = 0; i < 300; i++) {if (i % 50 == 0) {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}sendMessage(String.valueOf(i + 1000));}// deleteStream();try {Thread.sleep(40000);} catch (InterruptedException e) {e.printStackTrace();}}public void sendMessage(String message) {redisTemplate.opsForStream().add(StreamRecords.newRecord().in(StreamConsumer.STREAM_KEY).ofObject(message));}public void deleteStream() {Boolean result = redisTemplate.opsForStream().destroyGroup(StreamConsumer.STREAM_KEY, StreamConsumer.STREAM_GROUP);System.out.println("delete stream group result is " + result);result = redisTemplate.opsForStream().destroyGroup(StreamConsumer.STREAM_KEY, StreamConsumer.STREAM_GROUP2);System.out.println("delete stream group2 result is " + result);redisTemplate.delete(StreamConsumer.STREAM_KEY);}
}