Python操作Kafka的高效
以下是使用Python操作Kafka的高效消息发送实例,涵盖基础发送、批量处理、异步回调等场景。示例基于confluent-kafka
库(推荐)和kafka-python
库,代码均经过实测。
流程图
基础消息发送(同步)
from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('test_topic', key='key1', value='Hello Kafka')
producer.flush() # 确保消息发送完成
基础消息发送(异步)
from confluent_kafka import Producerdef delivery_report(err, msg):if err:print(f'Message delivery failed: {err}')else:print(f'Message delivered to {msg.topic()} [{msg.partition()}]')producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('test_topic', value='Async message', callback=delivery_report)
producer.poll(0) # 触发回调
producer.flush()
批量消息发送
from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
for i in range(100):producer.produce('batch_topic', value=f'Message {i}')
producer.flush()
带Key的消息发送
from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
for user_id in ['user1', 'user2', 'user3']:producer.produce('user_events', key=user_id, value=f'Event for {user_id}')
producer.flush()
高性能配置
from confluent_kafka import Producerconf = {'bootstrap.servers': 'localhost:9092','queue.buffering.max.messages': 100000,'queue.buffering.max.ms': 500,'batch.num.messages': 1000
}
producer = Producer(conf)
消息头(Headers)支持
from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})
headers = [('trace-id', '12345'), ('source', 'python-app')]
producer.produce('with_headers', value='Message', headers=headers)
producer.flush()
消息时间戳
from confluent_kafka import Producer
import timeproducer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('timed_topic', value='Timestamped', timestamp=int(time.time()*1000))
producer.flush()
自定义分区路由
from confluent_kafka import Producerdef partitioner(key, partitions, opaque):return hash(key) % len(partitions)producer = Producer({'bootstrap.servers': 'localhost:9092','partitioner': partitioner
})
producer.produce('custom_partition', key='user123', value='Data')
producer.flush()
压缩消息
from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092','compression.type': 'gzip'
})
producer.produce('compressed_topic', value='Compressed message')
producer.flush()
同步发送超时控制
from confluent_kafka import Producer, KafkaExceptionproducer = Producer({'bootstrap.servers': 'localhost:9092'})
try:producer.produce('timeout_topic', value='Test')producer.flush(timeout=5) # 5秒超时
except KafkaException as e:print(f"Send failed: {e}")
使用kafka-python基础发送
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('python_topic', value=b'Message from kafka-python')
producer.flush()
kafka-python带Key发送
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('keyed_topic', key=b'user1', value=b'User event')
producer.flush()
kafka-python批量发送
from kafka i