1 自动提交
1.1 原理:
Kafka 消费者后台线程每隔 auto.commit.interval.ms 自动提交最近一次 poll() 的 offset
无需开发者干预
1.2 示例:
enable.auto.commit=true
auto.commit.interval.ms=5000 # 每 5 秒自动提交一次
from confluent_kafka import Consumer, KafkaError
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': True, # 自动提交'auto.commit.interval.ms': 5000 # 每个5s自动提交一次
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# consumer.commit(msg) 不需要这行,这是手动提交时需要用的# 业务处理逻辑print(f"处理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中断信号已接收")finally:consumer.close()
2 手动提交
2.1 至多一次
2.1.1 原理
消息处理后立即提交偏移量;
即使处理失败也不会重试;
适合对消息丢失容忍度高的场景(如日志采集)。
2.1.2 示例:
enable.auto.commit=False:禁用自动提交偏移量
手动调用consumer.commit(msg)提交偏移量
auto.offset.reset=‘earliest’:从最早消息开始消费
from confluent_kafka import Consumer, KafkaError
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': False
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# 手动提交偏移量(最多一次核心)consumer.commit(msg)print(f"已提交偏移量: {msg.offset()}")# 业务处理逻辑print(f"处理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中断信号已接收")finally:consumer.close()
2.2 最少一次
2.2.1 原理
通过重试机制+手动提交偏移量实现:
- 消息处理失败时自动重试(最多3次)
- 成功处理后批量提交偏移量
- 延长轮询间隔避免重平衡
2.2.2 示例
该示例是批量处理消息,批量提交;当然也可以一次处理一条消息,并一次提交一条消息偏移量
from confluent_kafka import Consumer, KafkaError, TopicPartition
import time
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'my_group','auto.offset.reset': 'earliest','enable.auto.commit': False, # 手动提交控制'max.poll.interval.ms': 300000, # 延长轮询间隔'session.timeout.ms': 10000,'heartbeat.interval.ms': 3000
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])def process_with_retry(msg):"""带重试的消息处理"""for attempt in range(3):try:# 替换为实际业务逻辑print(f"处理消息: {msg.value().decode('utf-8')}")return Trueexcept Exception:time.sleep(1) # 指数退避可在此实现return Falsetry:while True:# 批量拉取10条消息msgs = consumer.consume(num_messages=10, timeout=1.0)processed = []for msg in msgs:if msg.error():continue# 处理消息(带重试)if process_with_retry(msg):processed.append(TopicPartition(msg.topic(), msg.partition(), msg.offset()))# 批量提交偏移量if processed:consumer.commit(offsets=processed)print(f"已提交偏移量: {[p.offset for p in processed]}")except KeyboardInterrupt:pass
finally:consumer.close()
补充:延长轮询间隔避免重平衡
核心概念解析:
- 轮询间隔:指Kafka消费者两次调用poll()方法拉取消息的时间间隔,由max.poll.interval.ms参数控制(默认5分钟)。
- 重平衡(Rebalance):当消费者组成员变动、主题/分区变化或心跳超时时,Kafka会触发分区重新分配,导致消费者暂停消费。
为什么延长轮询间隔能避免重平衡?
- 防止误判消费者宕机
- Kafka通过session.timeout.ms(默认10秒)和heartbeat.interval.ms(默认3秒)检测消费者存活。
- 若消息处理耗时超过max.poll.interval.ms(默认5分钟),Kafka会认为消费者已宕机,触发重平衡。
- 延长轮询间隔(如设为10分钟)可允许更长的消息处理时间,避免因业务逻辑耗时过长导致的误判重平衡。
- 避免频繁重平衡的连锁反应
- 重平衡期间消费者暂停消费,导致消息堆积和延迟。
- 频繁重平衡(如每5分钟触发一次)会显著降低吞吐量,延长端到端延迟。
关键参数配置建议:
参数 | 默认值 | 推荐值 | 作用 |
---|---|---|---|
max.poll.interval.ms | 5分钟 | 根据业务处理时间调整(如10-30分钟) | 控制两次poll的最大间隔,防止处理超时触发重平衡 |
session.timeout.ms | 10秒 | 30秒-1分钟 | 心跳超时时间,需大于heartbeat.interval.ms |
heartbeat.interval.ms | 3秒 | 2-5秒 | 心跳发送频率,建议设为session.timeout.ms的1/3 |
2.3 精确一次
2.3.1 幂等性消费 (Idempotent Consumption) - 推荐且最常用
思路:承认消息可能会被重复传递,但从业务逻辑上保证重复执行不会产生负面效果。
做法:在消费者的处理逻辑中,设计幂等性。例如:
为每条消息生成一个唯一 ID(可以是消息key,或自定义UUID)。
在处理前,先检查这个 ID 是否已经被处理过(比如在数据库里查一下)。
如果已处理,就直接跳过并提交位移(视为成功);如果未处理,则执行业务逻辑。
这是最有效、最通用的方法,因为它不依赖于任何特定技术,而是从业务设计上根本性地解决问题。
例如:
a) 对于流程中的消息,每条消息中包含唯一id,比如业务id,在数据库中将业务id作为Unique key,插入重复时会报duplicate key异常,不会导致数据库中出现脏数据
b) Redis中使用set存储业务id,天然幂等性
c) 如果不是上面两个场景,需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下消费过吗?如果没有消费过,就执行相应业务进行处理,然后这个 id 写 Redis,最后提交偏移。如果消费过了,那如果消费过了,那就别处理了,保证不重复处理相同的消息即可
2.3.2 事务性输出 (Transactional Output) / 两阶段提交 (2PC) - 复杂且受限
思路:将消费者的“业务处理”和“位移提交”绑定为一个分布式事务。
做法:例如,使用 Kafka 的事务性生产者,将处理结果和位移提交到外部系统(如另一个Kafka主题)的操作放在一个事务里。但这通常需要外部系统(如数据库)也支持参与 Kafka 事务(通过 Kafka Connect),实现复杂度非常高,性能和可用性也会受影响。不推荐普通应用使用。