一、幂等性实现
1.1 什么是幂等性?
幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。
RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。
1.2 实现思路
RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。
消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略
二、消息重放实现
在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。
2.1 ack和nack
如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。
(1) ack
确认消息已被消费成功。当消费者调用:
ch.basic_ack(delivery_tag=method.delivery_tag)
RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。
(2) nack
告诉RabbitMQ“我没处理好”。有两种方式:
# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
2.2 实现代码
下方代码实现了以下关键功能:
1. 消息通过 SETNX + EXPIRE 在 Redis 中写入幂等标记,确保同一消息只会被一个消费者处理。
2. 如果标记已存在,判断是“已完成”还是“正在处理”,分别选择直接确认或稍后重试。
3. 业务处理成功后将标记更新为 done 并延长过期,表示消费已完成。
4. 如果处理失败,删除标记以便下次重新消费,并根据重试次数决定是否放弃或重试。
生产者代码
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message" # 可以通过推送body = "fail message" 模拟消费异常properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
消费者代码
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)MAX_RETRY = 5def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"retry_key = f"retry:{message_id}"# 尝试用SETNX写入幂等标记result = r.setnx(redis_key, "processing")if not result:status = r.get(redis_key)if status and status.decode() == "done":# 已经处理过ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")else:# 正在处理,稍后重试ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)print(f"[!] Message {message_id} is being processed by another consumer.")return# SETNX成功,要设置过期时间,防止永久占用r.expire(redis_key, 300) # 300秒try:# 获取重试次数retry_count = r.get(retry_key)if retry_count is None:retry_count = 0else:retry_count = int(retry_count)print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")# 模拟失败if "fail" in body.decode():raise Exception("Simulated failure")# 业务逻辑# ...# 处理成功,改为done并延长过期r.set(redis_key, "done")r.expire(redis_key, 24*60*60)r.delete(retry_key)ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:retry_count += 1r.set(retry_key, retry_count)r.expire(retry_key, 24*60*60)print(f"[!] Error processing message (retry {retry_count}): {e}")# 失败时删除幂等标记,下次可以继续处理r.delete(redis_key)if retry_count >= MAX_RETRY:ch.basic_ack(delivery_tag=method.delivery_tag)print("[!] Max retries reached, moving message to dead letter log.")else:ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()