参考
Redis队列详解(springboot实战)_redis 队列-CSDN博客
前言
MQ消息队列有很多种,比如RabbitMQ,RocketMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过springboot实战使其更易懂。
1. 基于List的 LPUSH+BRPOP 的实现
2. PUB/SUB,订阅/发布模式
3. 基于Stream类型的实现
1、基于List的的实现
原理
使用rpush和lpush操作入队列,lpop和rpop操作出队列。
List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。
优点
一旦数据到来则立刻醒过来,消息延迟几乎为零。
缺点
-
不能重复消费,一旦消费就会被删除
-
不能做广播模式 , 不支持分组消费
-
lpop和rpop会一直空轮训,消耗资源 ,但可以 引入阻塞读blpop和brpop 同时也有新的问题 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常
代码
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId></dependency>
配置文件
server:port: ${SERVER_PORT:9210}# Spring
spring:application:# 应用名称name: ruoyi-redis-messageredis:host: localhostport: 6379password: 123456
启动类
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class RuoYiRedisMessageApplication
{public static void main(String[] args){SpringApplication.run(RuoYiRedisMessageApplication.class, args);System.out.println("(♥◠‿◠)ノ゙ ruoyi-redis-message启动成功");}
}
添加redis配置类
/*** redis配置*/
@Configuration
public class RedisConfig {private static final RedisSerializer<Object> SERIALIZER = createSerializer();@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 创建 RedisTemplate 对象RedisTemplate<String, Object> template = new RedisTemplate<>();// 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());template.setHashKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。template.setValueSerializer(SERIALIZER);template.setHashValueSerializer(SERIALIZER);return template;}private static RedisSerializer<Object> createSerializer() {ObjectMapper mapper = new ObjectMapper();mapper.registerModules(new JavaTimeModule());// 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXXmapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);return new GenericJackson2JsonRedisSerializer(mapper);}}
队列方法
@Slf4j
@Service
public class ListRedisQueue {//队列名public static final String KEY = "listQueue";@Resourceprivate RedisTemplate redisTemplate;public void produce(String message) {redisTemplate.opsForList().rightPush(KEY, message);}public void consume() {while (true) {String msg = (String) redisTemplate.opsForList().leftPop(KEY);log.info("疯狂获取消息:" + msg);}}public void blockingConsume() {while (true) {List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {//队列没有元素会阻塞操作,直到队列获取新的元素或超时,5表示如果没元素就每五秒去拿一次消息return connection.bRPop(5, KEY.getBytes());}}, new StringRedisSerializer());for (Object str : obj) {log.info("blockingConsume获取消息 : {}", str);}}}}
测试
lPop/rPop消费数据
@Slf4j
@SpringBootTest
public class ListRedisTest {@Autowiredprivate ListRedisQueue listRedisQueue;@Testpublic void produce() {for (int i = 0; i < 5; i++) {listRedisQueue.produce("第"+i + "个数据");}}@Testpublic void consume() {produce();log.info("生产消息完毕");listRedisQueue.consume();}}
blpop / brpop 消费数据
@Testpublic void blockingConsume() {produce();log.info("生产消息完毕");listRedisQueue.blockingConsume();}
2、PUB/SUB,订阅/发布模式
原理
SUBSCRIBE,用于订阅信道
PUBLISH,向信道发送消息
UNSUBSCRIBE,取消订阅
此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。
优点
-
一个消息可以发布到多个消费者
-
消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)
-
消息即时发送,消费者会自动接收到信道发布的消息
缺点
-
消息发布时,如果客户端不在线,则消息丢失
-
消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失
-
消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)
代码
配置消息监听器
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 消息处理** @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(pattern);log.info("onMessage --> 消息通道是:{}", channel);RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();Object deserialize = valueSerializer.deserialize(message.getBody());log.info("反序列化的结果:{}", deserialize);if (deserialize == null) return;String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));log.info("计算得到的key: {}", md5DigestAsHex);Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {// redis消息进行处理log.info("接收的结果:{}", deserialize);} else {log.info("其他服务处理中");}}
}
实现MessageListener 接口,就可以通过onMessage()方法接收到消息了,该方法有两个参数:
-
参数 message 的 getBody() 方法以二进制形式获取消息体, getChannel() 以二进制形式获取消息通道
-
参数 pattern 二进制形式的消息通道(实际和 message.getChannel() 返回值相同)
绑定监听器
@Configuration
public class RedisMessageListenerConfig {@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,RedisMessageListener redisMessageListener) {RedisMessageListenerContainer messageListenerContainer = new RedisMessageListenerContainer();messageListenerContainer.setConnectionFactory(redisConnectionFactory);messageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(PubSubRedisQueue.KEY));messageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(PubSubRedisQueue.KEY2));return messageListenerContainer;}
}
RedisMessageListenerContainer 是为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级别详细信息。
本文使用的是主题订阅:ChannelTopic,你也可以使用模式匹配:PatternTopic,从而匹配多个信道。
这里我们同一个监听器订阅了两个信道
生产者
@Service
public class PubSubRedisQueue {//队列名public static final String KEY = "pub_sub_queue";public static final String KEY2 = "pub_sub_queue2";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void produce(String message) {redisTemplate.convertAndSend(KEY, message);}public void produce2(String message) {redisTemplate.convertAndSend(KEY2, message);}
}
测试
@Slf4j
@RestController
@RequestMapping(value = "/pubSubRedis")
@Api(tags = "pubSubRedis测试")
public class PubSubRedisController {@Autowiredprivate PubSubRedisQueue pubSubRedisQueue;@GetMapping(value = "/pubsub/produce")@ApiOperation(value = "测试")public void produce(@RequestParam(name = "msg") String msg) {pubSubRedisQueue.produce(msg);}@GetMapping(value = "/pubsub/produce2")@ApiOperation(value = "测试2")public void produce2(@RequestParam(name = "msg") String msg) {pubSubRedisQueue.produce2(msg);}
}
可以看到监听器成功监听了两个信道的信息
3、基于Stream类型的实现(Redis Version5.0)
原理
Stream为redis 5.0后新增的数据结构。支持多播的可持久化消息队列,实现借鉴了Kafka设计。
Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。
每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。
每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。
每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到。
同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。
消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
优点
- 高性能:可以在非常短的时间内处理大量的消息。
- 持久化:支持数据持久化,即使Redis服务器宕机,也可以恢复之前的消息。
- 顺序性:保证消息的顺序性,即使是并发的消息也会按照发送顺序排列。
- 灵活性:可以方便地扩展和分布式部署,可以满足不同场景下的需求。
缺点
- 功能相对简单:Redis Stream相对于其他的消息队列,功能相对简单,无法满足一些复杂的需求。
- 不支持消息回溯:即消费者无法获取之前已经消费过的消息。
- 不支持多消费者分组:无法实现多个消费者并发消费消息的功能。
代码
自动ack消费者
@Slf4j
@Component
public class AutoAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分组名public static final String GROUP = "auto_ack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[自动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}
手动ack消费者
@Slf4j
@Component
public class BasicAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分组名public static final String GROUP = "basic_ack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[手动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue());//消费完毕删除该条消息redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}
配置绑定关系
@Slf4j
@Configuration
public class RedisStreamConfiguration {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Autowiredprivate AutoAckStreamConsumeListener autoAckStreamConsumeListener;@Autowiredprivate BasicAckStreamConsumeListener basicAckStreamConsumeListener;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(3)// 运行 Stream 的 poll task.executor(executor)// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小.pollTimeout(Duration.ofSeconds(3))// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理.errorHandler(throwable -> log.info("出现异常就来这里了" + throwable)).build();StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);// 独立消费// 消费组A,自动ack// 从消费组中没有分配给消费者的消息开始消费if (!isStreamGroupExists(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP)){redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP);}streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"),StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), autoAckStreamConsumeListener);// 消费组B,不自动ackif (!isStreamGroupExists(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP)){redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP);}streamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"),StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), basicAckStreamConsumeListener);return streamMessageListenerContainer;}/*** 判断该消费组是否存在* @param streamKey* @param groupName* @return*/public boolean isStreamGroupExists(String streamKey, String groupName) {RedisStreamCommands commands = redisConnectionFactory.getConnection().streamCommands();//首先检查Stream Key是否存在,否则下面代码可能会因为尝试检查不存在的Stream Key而导致异常if (Boolean.FALSE.equals(redisTemplate.hasKey(streamKey))){return false;}//获取streamKey下的所有groupsStreamInfo.XInfoGroups xInfoGroups = commands.xInfoGroups(streamKey.getBytes());AtomicBoolean exists= new AtomicBoolean(false);assert xInfoGroups != null;xInfoGroups.forEach(xInfoGroup -> {if (xInfoGroup.groupName().equals(groupName)){exists.set(true);}});return exists.get();}
}
生产者
@Slf4j
@Service
public class StreamRedisQueue {//队列名public static final String KEY = "stream_queue";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public String produce(Map<String, String> value) {return Objects.requireNonNull(redisTemplate.opsForStream().add(KEY, value)).getValue();}public void createGroup(String key, String group){redisTemplate.opsForStream().createGroup(key, group);}}
测试
生产消息
@Slf4j
@RestController
@RequestMapping(value = "/streamRedis")
@Api(tags = "streamRedis测试")
public class StreamRedisController {@Autowiredprivate StreamRedisQueue streamRedisQueue;@GetMapping(value = "/stream/produce")@ApiOperation(value = "测试")public void streamProduce() {Map<String, String> map = new HashMap<>();map.put("刘德华", "大家好我是刘德华");map.put("周杰伦", "周杰伦");map.put("time", DateUtil.now());String result = streamRedisQueue.produce(map);log.info("返回结果:{}", result);}
}