1. 简介
RabbitMQ 的消息发送流程:
- producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费
那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。
“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。
2. 为什么会有这个特性
当 broker 给 consumer 发送消息时,可能会出现下面两种情况:
- 消息未成功到达 consumer;
- 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。
这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。
3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)
public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}
这是一段路由模式的代码,在这段代码中,有下面一条语句:
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
在这个方法中,有三个参数:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- queue:consumer 通过哪个队列获取 broker 发送的消息;
- autoAck:是否自动确认;
- callback:consumer 消费消息的逻辑。
其中,autoAck 就是消息确认的体现:
- autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
- autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。
将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:
- 还未被发送的消息;
- 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。
4. 在 spring 中使用 RabbitMQ 时如何进行消息确认
4.1 basicAck
在 spring 下的 Channel 类中提供了下面几种方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
在这个方法中,有三个参数:
- deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
- mulitple: 是否批量确认
使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。
4.2 basicNack
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
在这个方法中,多了一个参数:
- requeue:是否重新入队列。
使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。
若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;
若 requeue 为 false,broker 就会这条消息删除。
4.3 basicReject
void basicReject(long deliveryTag, boolean requeue) throws IOException;
这个方法与 basicNack 大致相同,此处省略。
4.4 配置
在 spring 中,提供了三种配置用于消息确认:
- none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
- auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
- manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none
5. 代码测试
@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息发送成功";}
}
这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。
5.1 none
① 无异常时的消费者代码:
@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}
代码运行结果如下:
我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:
可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。
② 有异常时的消费者代码:
@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}
代码运行结果如下:
由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:
和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。
5.2 auto
① 无异常时的消费者代码:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}
代码运行结果如下:
在 RabbitMQ 客户端中显示,这条消息已经被成功消费:
② 有异常时的消费者代码:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}
代码运行结果如下:
在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:
5.3 manual
① 无异常的消费者代码如下:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。
代码运行结果如下:
RabbitMQ 客户端显示这条消息被成功消费:
② 有异常的消费者代码如下:
当 requeue 为 true:
@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}
在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:
我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。
并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:
当 requeue 为 false:
当处理消息发生异常后,就会将消息从队列中删除。
代码运行结果如下:
虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除: