一、架构概述

Apache Flink 提供的 Kafka 写入连接器是实现与 Kafka 消息队列集成的关键组件,支持多种语义保证和灵活配置选项。本文将深入分析 Flink Kafka 写入连接器的源码实现,包括架构设计、核心类、事务机制和性能优化等方面。

1.1 整体架构

Flink Kafka 写入连接器的核心组件包括:

  • KafkaSink:写入器的入口点,负责配置和创建写入器
  • KafkaWriter:实际执行消息写入的工作类
  • KafkaSerializationSchema:消息序列化接口
  • KafkaCommittableManager:管理事务提交的组件
  • FlinkKafkaProducer:旧版 Kafka 写入器实现(基于 RichSinkFunction)

整体数据流路径为:Flink 处理数据 -> SerializationSchema 序列化消息 -> KafkaWriter 写入 Kafka。

二、核心类与实现

2.1 KafkaSink 与构建器

KafkaSink 是创建 Kafka 写入器的主要入口点,采用构建器模式配置各项参数:

// KafkaSink.java
public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, KafkaWriter<IN>> {private final String bootstrapServers;private final KafkaSerializationSchema<IN> serializationSchema;private final DeliveryGuarantee deliveryGuarantee;private final String transactionalIdPrefix;private final Duration kafkaProducerConfigCheckInterval;private final Properties kafkaProducerConfig;// 私有构造函数private KafkaSink(...) {// 参数初始化}// 构建器方法public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}@Overridepublic Writer<IN, KafkaCommittable, KafkaWriterState> createWriter(Sink.InitContext context,List<KafkaWriterState> states) throws IOException {// 创建 KafkaWriterreturn new KafkaWriter<>(bootstrapServers,serializationSchema,deliveryGuarantee,transactionalIdPrefix,context.metricGroup(),context.getUserCodeClassLoader(),states,kafkaProducerConfig,kafkaProducerConfigCheckInterval);}@Overridepublic Committer<KafkaCommittable> createCommitter() throws IOException {// 创建提交器return new KafkaCommitter(bootstrapServers,deliveryGuarantee,kafkaProducerConfig);}@Overridepublic GlobalCommitter<KafkaCommittable, KafkaGlobalCommittable> createGlobalCommitter() throws IOException {// 创建全局提交器return new KafkaGlobalCommitter(bootstrapServers,deliveryGuarantee,kafkaProducerConfig);}// 其他方法...
}

KafkaSinkBuilder 提供了流式配置接口,允许设置各种参数:

KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic1").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();

2.2 KafkaWriter 实现

KafkaWriter 是实际执行消息写入的核心类:

// KafkaWriter.java
public class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> {private final KafkaSerializationSchema<IN> serializationSchema;private final DeliveryGuarantee deliveryGuarantee;private final String transactionalIdPrefix;private final int subtaskId;private final int totalNumberOfSubtasks;private final KafkaProducer<byte[], byte[]> kafkaProducer;private final Map<Long, TransactionHolder> ongoingTransactions;private final List<TransactionHolder> pendingTransactions;private final List<TransactionHolder> completedTransactions;private final List<KafkaWriterState> recoveredStates;private final Duration producerConfigCheckInterval;private final Properties kafkaProducerConfig;private TransactionHolder currentTransaction;private long currentCheckpointId;public KafkaWriter(...) {// 初始化参数this.serializationSchema = serializationSchema;this.deliveryGuarantee = deliveryGuarantee;this.transactionalIdPrefix = transactionalIdPrefix;this.subtaskId = subtaskId;this.totalNumberOfSubtasks = totalNumberOfSubtasks;this.ongoingTransactions = new LinkedHashMap<>();this.pendingTransactions = new ArrayList<>();this.completedTransactions = new ArrayList<>();this.recoveredStates = recoveredStates;this.producerConfigCheckInterval = producerConfigCheckInterval;this.kafkaProducerConfig = kafkaProducerConfig;// 创建 KafkaProducerthis.kafkaProducer = createKafkaProducer();// 如果是精确一次语义,初始化事务if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {initializeTransactions();}}@Overridepublic void write(IN element, Context context) throws IOException {// 序列化消息ProducerRecord<byte[], byte[]> record = serializationSchema.serialize(element,context.timestamp(),context.partition(),context.topic());// 根据不同的语义保证写入消息if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 在精确一次语义下,确保事务处于活动状态ensureTransactionActive(context.currentProcessingTime());// 发送消息到 KafkakafkaProducer.send(record, (metadata, exception) -> {if (exception != null) {// 处理发送失败的情况}});} else {// 在至少一次或最多一次语义下,直接发送消息kafkaProducer.send(record);}}@Overridepublic List<KafkaCommittable> prepareCommit(boolean flush) throws IOException {// 准备提交,返回待提交的事务List<KafkaCommittable> committables = new ArrayList<>();if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 对于精确一次语义,将当前事务标记为待提交if (currentTransaction != null) {pendingTransactions.add(currentTransaction);committables.add(currentTransaction.toCommittable());currentTransaction = null;}}return committables;}@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {// 快照当前状态List<KafkaWriterState> states = new ArrayList<>();if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 对于精确一次语义,创建事务状态快照if (currentTransaction != null) {states.add(currentTransaction.toWriterState());}}return states;}// 其他核心方法...
}

2.3 事务管理器实现

Flink Kafka 写入连接器通过事务机制实现精确一次语义:

// TransactionHolder.java
public class TransactionHolder {private final String transactionalId;private final long checkpointId;private final KafkaProducer<byte[], byte[]> producer;private final boolean isRecovered;private boolean isAborted;public TransactionHolder(String transactionalId,long checkpointId,KafkaProducer<byte[], byte[]> producer,boolean isRecovered) {this.transactionalId = transactionalId;this.checkpointId = checkpointId;this.producer = producer;this.isRecovered = isRecovered;this.isAborted = false;}public void begin() {producer.beginTransaction();}public void commit() {if (!isAborted) {producer.commitTransaction();}}public void abort() {if (!isAborted) {producer.abortTransaction();isAborted = true;}}// 转换为可提交对象public KafkaCommittable toCommittable() {return new KafkaCommittable(transactionalId, checkpointId, isRecovered);}// 转换为写入器状态public KafkaWriterState toWriterState() {return new KafkaWriterState(transactionalId, checkpointId);}// 其他方法...
}

三、精确一次语义实现

Flink Kafka 写入连接器通过 Kafka 的事务 API 实现精确一次语义:

3.1 事务初始化

// KafkaWriter.java
private void initializeTransactions() {// 恢复之前的事务if (!recoveredStates.isEmpty()) {for (KafkaWriterState state : recoveredStates) {String transactionalId = state.getTransactionalId();long checkpointId = state.getCheckpointId();// 创建恢复的事务KafkaProducer<byte[], byte[]> producer = createTransactionalProducer(transactionalId);TransactionHolder recoveredTransaction = new TransactionHolder(transactionalId, checkpointId, producer, true);ongoingTransactions.put(checkpointId, recoveredTransaction);}// 按检查点 ID 排序List<Long> sortedCheckpointIds = new ArrayList<>(ongoingTransactions.keySet());Collections.sort(sortedCheckpointIds);// 恢复事务状态for (long checkpointId : sortedCheckpointIds) {TransactionHolder transaction = ongoingTransactions.get(checkpointId);try {transaction.producer.initTransactions();} catch (ProducerFencedException e) {// 处理异常}}// 创建新的当前事务createNewTransaction();} else {// 如果没有恢复的状态,直接创建新事务createNewTransaction();}
}

3.2 消息写入与事务管理

// KafkaWriter.java
private void ensureTransactionActive(long currentTime) {// 检查是否需要创建新事务if (currentTransaction == null) {createNewTransaction();}// 检查生产者配置是否需要更新if (producerConfigCheckInterval != null && currentTime - lastProducerConfigCheckTime >= producerConfigCheckInterval.toMillis()) {checkAndRecreateProducerIfNeeded();lastProducerConfigCheckTime = currentTime;}
}private void createNewTransaction() {// 生成新的事务 IDString transactionalId = generateTransactionalId();currentCheckpointId++;// 创建新的事务生产者KafkaProducer<byte[], byte[]> producer = createTransactionalProducer(transactionalId);// 初始化事务producer.initTransactions();// 创建事务持有者currentTransaction = new TransactionHolder(transactionalId, currentCheckpointId, producer, false);// 开始事务currentTransaction.begin();// 记录正在进行的事务ongoingTransactions.put(currentCheckpointId, currentTransaction);
}

3.3 事务提交与恢复

// KafkaCommitter.java
public class KafkaCommitter implements Committer<KafkaCommittable> {private final DeliveryGuarantee deliveryGuarantee;private final Properties kafkaProducerConfig;private transient Map<String, KafkaProducer<byte[], byte[]>> producers;public KafkaCommitter(String bootstrapServers,DeliveryGuarantee deliveryGuarantee,Properties kafkaProducerConfig) {this.deliveryGuarantee = deliveryGuarantee;this.kafkaProducerConfig = new Properties();this.kafkaProducerConfig.putAll(kafkaProducerConfig);this.kafkaProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);}@Overridepublic List<KafkaCommittable> commit(List<KafkaCommittable> committables) throws IOException {List<KafkaCommittable> failedCommittables = new ArrayList<>();for (KafkaCommittable committable : committables) {try {// 获取或创建生产者KafkaProducer<byte[], byte[]> producer = getOrCreateProducer(committable.getTransactionalId());// 如果是恢复的事务,需要先初始化if (committable.isRecovered()) {producer.initTransactions();}// 提交事务producer.commitTransaction();} catch (Exception e) {// 记录失败的提交failedCommittables.add(committable);}}return failedCommittables;}// 其他方法...
}

四、性能优化与调优

Flink Kafka 写入连接器提供了多种性能优化选项:

4.1 批量写入配置

// 在构建 KafkaSink 时配置批量写入参数
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("batch.size", "16384")      // 批次大小,单位字节.setProperty("linger.ms", "5")          // 等待时间,增加批处理机会.setProperty("buffer.memory", "33554432") // 生产者缓冲区大小.build();

4.2 压缩配置

// 配置消息压缩
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("compression.type", "lz4") // 压缩类型:none, gzip, snappy, lz4, zstd.build();

4.3 异步发送配置

// 配置异步发送参数
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("max.in.flight.requests.per.connection", "5") // 每个连接允许的最大未完成请求数.setProperty("acks", "all") // 确认模式:0, 1, all.build();

五、错误处理与恢复机制

Flink Kafka 写入连接器提供了完善的错误处理和恢复机制:

5.1 重试机制

// 配置生产者重试参数
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("retries", "3")                 // 重试次数.setProperty("retry.backoff.ms", "100")      // 重试退避时间.setProperty("delivery.timeout.ms", "120000") // 消息传递超时时间.build();

5.2 异常处理

// KafkaWriter.java
private void handleSendException(ProducerRecord<byte[], byte[]> record, Exception exception) {// 记录异常信息LOG.error("Error sending record to Kafka: {}", record, exception);// 根据异常类型进行不同处理if (exception instanceof RetriableException) {// 可重试异常,记录重试次数retryCount++;if (retryCount > maxRetries) {// 超过最大重试次数,抛出异常throw new IOException("Failed to send record after retries", exception);}// 重试发送kafkaProducer.send(record, this::handleSendResult);} else {// 不可重试异常,立即抛出throw new IOException("Failed to send record", exception);}
}

六、总结

Flink Kafka 写入连接器通过精心设计的架构和实现,提供了高性能、可靠且灵活的 Kafka 数据写入能力。其核心组件包括写入器、序列化器和事务管理器,共同实现了精确一次语义、批量写入和错误恢复等关键特性。通过深入理解其源码实现,开发者可以更好地使用和调优该连接器,满足不同场景下的数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86590.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86590.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/86590.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

强化学习理论基础:从Q-learning到PPO的算法演进(2)

文章目录 Policy gradient思想(REINFORCE算法)优势函数PPO(Proximal Policy Optimization)Policy gradient思想(REINFORCE算法) 下面我们来探讨一下Policy gradient策略,也就是REINFORCE算法。 在玩剪刀石头布这个简单的游戏中,我们可以有不同的策略。一种是完全随机地…

Oracle数据库文件变成32k故障恢复--惜分飞

最近一个客户数据库重启系统之后,数据文件大小变为了32kb,我接手的不是第一现场(客户那边尝试了rman还原操作),查看alert日志,数据库最初报错 Wed Jun 18 13:09:23 2025 alter database open Block change tracking file is current. Read of datafile D:\APP\ADMINISTRATOR\OR…

移动端 uniapp 写一个可自由拖拽的小键盘

写之前要考虑&#xff1a; 键盘展开后&#xff0c;不能超过手机边缘在底部展开键盘&#xff0c;键盘应出现在展开按钮上方&#xff1b;以此类推重复点击展开按钮&#xff0c;关闭键盘 效果&#xff1a; 代码如下&#xff0c;有些按键逻辑还需要优化 <template><vi…

《二分枚举答案(配合数据结构)》题集

文章目录 1、模板题集2、课内题集3、课后题集1. 字符串哈希2. 并查集3. ST表 1、模板题集 分巧克力 2、课内题集 倒水 冶炼金属 连续子序列的个数 3、课后题集 括号内的整数代表完整代码行数。 1. 字符串哈希 你猜猜是啥题(60) 2. 并查集 拯救萌萌(72) 3. ST表 GCD不小…

PY32F030单片机,优势替代ST GD,主频48MHz,带LED数码管驱动

PY32F030是一款高性能32位单片机&#xff0c;采用ARM Cortex-M0内核&#xff0c;工作频率高达48MHz&#xff0c;具备64KB Flash和8KB SRAM。它支持1.7V~5.5V宽电压范围&#xff0c;集成多路I2C、SPI、USART通讯外设&#xff0c;配备12位ADC、16位定时器和比较器&#xff0c;适用…

Rockchip Uboot中修改固件探测的存储介质

Rockchip Uboot中修改固件探测的存储介质 Rockchip uboot中支持从 eMMC、SDcard、NAND 、SPI_NAND、SPI_NOR等存储介质引导固件。 uboot的spl启动的时候会默认呢都会去探测这些介质&#xff0c;这样会导致探测时间变长&#xff0c;在实际产品中可以根据产品需求进行个性化的配…

动手学Python:从零开始构建一个“文字冒险游戏”

动手学Python&#xff1a;从零开始构建一个“文字冒险游戏” 大家好&#xff0c;我是你的技术向导。今天&#xff0c;我们不聊高深的框架&#xff0c;也不谈复杂的算法&#xff0c;我们来做一点“复古”又极具趣味性的事情——用Python亲手打造一个属于自己的文字冒险游戏&…

基于Kafka实现企业级大数据迁移的完整指南

在大数据时代&#xff0c;数据迁移已成为企业数字化转型过程中的常见需求。本文将详细介绍如何利用Kafka构建高可靠、高性能的大数据迁移管道&#xff0c;涵盖从设计到实施的完整流程。 一、为什么选择Kafka进行数据迁移&#xff1f; Kafka作为分布式消息系统&#xff0c;具有…

GEO引领品牌大模型种草:迈向Web3.0与元宇宙的认知新空间

在数字技术的演进历程中&#xff0c;我们正经历着从Web2.0到Web3.0、从平面互联网到沉浸式元宇宙的范式转变。这一转变不仅重塑了数字空间的形态和交互方式&#xff0c;更深刻改变了品牌与用户的连接模式和价值创造逻辑。而在这个新兴的数字疆域中&#xff0c;生成式引擎优化&a…

【机器学习与数据挖掘实战 | 医疗】案例18:基于Apriori算法的中医证型关联规则分析

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈机器学习与数据挖掘实战 ⌋ ⌋ ⌋ 机器学习是人工智能的一个分支,专注于让计算机系统通过数据学习和改进。它利用统计和计算方法,使模型能够从数据中自动提取特征并做出预测或决策。数据挖掘则是从大型数据集中发现模式、关联…

83、高级特性-自定义starter细节

83、高级特性-自定义starter细节 自定义Spring Boot Starter可以将通用功能封装成可复用的模块&#xff0c;简化其他项目的配置和使用。以下是创建自定义Starter的详细步骤和关键细节&#xff1a; ### 1. 项目结构 通常&#xff0c;自定义Starter包含两个模块&#xff1a; ####…

专注推理查询(ARQs):一种提升大型语言模型指令遵循度、决策准确性和防止幻觉的结构化方法

大型语言模型&#xff08;LLMs&#xff09;在客户服务、自动化内容创作和数据检索方面变得至关重要。然而&#xff0c;它们的有效性常常因其在多次交互中无法始终如一地遵循详细指令而受到限制。在金融服务和客户支持系统等高风险环境中&#xff0c;严格遵循指南是必不可少的&a…

华为云Flexus+DeepSeek征文 | DeepSeek驱动的医疗AI Agent:智能问诊系统开发完整指南

华为云FlexusDeepSeek征文 | DeepSeek驱动的医疗AI Agent&#xff1a;智能问诊系统开发完整指南 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 总有一行代码&#xff0c;能点亮万千星辰。 &#x1f50d; 在技术的宇宙中&#xff0c;我愿做永不停歇…

【大模型水印论文阅读2】前缀文本编码、均匀性约束

TOC &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f680; 感谢你的陪伴与支持~ 欢迎添加文末好友 &#x1f30c; 在所有感兴趣的领域扩展知识&#xff0c;不定期掉落福利资讯(*^▽^*) 写在最前面 版权声明&#xff1a;本文为原创&#xff0c;遵循 CC 4.0 BY-SA 协议。…

破茧时刻,与光同行

凌晨五点的闹钟刺破薄雾&#xff0c;我摸黑打开台灯。摊开的数学错题本上&#xff0c;函数图像在暖黄的光晕里舒展&#xff0c;像等待破译的密码。这样的清晨已持续三百多个日夜&#xff0c;我知道&#xff0c;在无数个相似的时刻里&#xff0c;总有千万盏台灯在黑暗中次第亮起…

Learning PostgresSQL读书笔记: 第8章 Triggers and Rules

本章将讨论以下内容&#xff1a; • 探索 PostgreSQL 中的规则 • 管理 PostgreSQL 中的触发器 • 事件触发器 探索 PostgreSQL 中的规则 文档中的这段话阐述了rule和trigger的区别&#xff1a; PostgreSQL 规则系统允许定义在数据库表中插入、更新或删除时执行的替代操作。粗…

信创国产化替代中的开发语言选择分析

在信息技术应用创新(信创)国产化替代过程中&#xff0c;选择合适的开发语言至关重要。以下是适合信创环境的开发语言及其优势分析&#xff1a; 主流适合信创的编程语言 1. Java 优势&#xff1a;跨平台特性(JVM)、丰富的生态体系、企业级应用成熟 信创适配&#xff1a;国内有…

Android 中 函数实现多个返回值的几种方式

在编程中&#xff0c;函数通常只能返回一个值。但通过使用对象封装、Pair、Triple、数组、列表或 Bundle 方式&#xff0c;可以轻松地返回多个值。 1、对象封装方式 创建数据类来封装需要返回的多个值。 data class Result(val code: Int, val message: String)fun getMultiV…

Leetcode百题斩-DP

又到了最好玩的dp了&#xff0c;各种玄学转移也算是其乐无穷。前段时间刚做的LCA正是这种题的小试牛刀&#xff0c;如果当时就把这个专题刷完了&#xff0c;或许我现在已经从西溪园区跑到云谷园区了。 不过&#xff0c;恐怖如斯的dp专题居然只给了一道hard&#xff0c;基本也没…

策略模式与工厂模式的黄金组合:从设计到实战

策略模式和工厂模式是软件开发中最常用的两种设计模式&#xff0c;当它们结合使用时&#xff0c;能产生11>2的效果。本文将通过实际案例&#xff0c;阐述这两种模式的协同应用&#xff0c;让代码架构更优雅、可维护性更强。 一、为什么需要组合使用&#xff1f; 单独使用的…