目录
一、延时插件实现
1、版本要求
2、为运行新容器时安装
3、为已运行的容器安装
4、验证安装
5、代码编写
1. 配置类
2. 生产者
3. 消费者
二、死信队列实现
1、代码编写
1. 配置类
2. 生产者
3. 消费者
三、踩坑记录
1、发送消息失败
2、消息过期后未能转发到死信队列
3、消费者消费报错
一、延时插件实现
1、版本要求
RabbitMQ 3.5.7以上
2、为运行新容器时安装
# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"
3、为已运行的容器安装
# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 3. 退出容器
exit
# 4. 重启容器使插件生效
docker restart rabbitmq
4、验证安装
# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项
5、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}
2. 生产者
public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
}
二、死信队列实现
1、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}
2. 生产者
public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
}
三、踩坑记录
1、发送消息失败
原因:RabbitTemplate
配置了消息抵达确认,消息ID没有传值。
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});
生产者实际发送消息未传消息ID:
错误格式
rabbitTemplate.convertAndSend(exchange, routingKey, data);
正确格式
String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));
2、消息过期后未能转发到死信队列
原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。
错误格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}
正确格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}
3、消费者消费报错
原因:发送的消息由于自定义的 MessageProperties
,其中缺失了 contentType
参数,需要使用转化器进行转换,而不是直接发送消息。
错误格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));
正确格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));