1.问题出现过程

在测试环境测试flink的job的任务消费kafka的情况,通过往job任务发送一条消息,然后flink web ui上消费出现了两条。然后通过重启JobManager和TaskManager后,任务从checkpoint恢复后就会出现重复消费。当任务不从checkpoint恢复的时候,任务不会出现重复消费的情况。由此可见是beam从checkpoint恢复的时候出现了重复消费的问题。

2.任务排查过程

由于我们beam使用的是FlinkRunner,所以Beam消费Kafka会基于Flink的Source的规范实现相关的Source。

Flink中的Source实现的几个重要的类:Source:工厂类负责实例化以下的几个组件SourceSplit:封装数据源的逻辑分片(如文件块、Kafka 分区区间)。SplitEnumerator:负责分片发现与分配逻辑。SourceReader:处理分片数据读取与反序列化。

在Beam中分别实现的Flink的KafkaSource是以下这几个类:

FlinkUnboundedSource
FlinkSourceSplit
FlinkSourceSplitEnumerator
FlinkSourceReaderBase <- FlinkUnboundedSourceReader

其中在Flink中Source算子的执行和SourceOpearator和SourceCoordinator这两个类有关,他们的执行顺序如下:

  1. 初始化阶段

    • SourceCoordinator 优先启动:在 JobMaster(JobManager)启动时,SourceCoordinator 作为独立组件被创建,并负责初始化 SplitEnumerator(分片枚举器)。

    • SourceOperator 后续启动:在 TaskManager 上,每个并行任务实例(Task)启动时,会初始化 SourceOperator,并在其open()方法中创建 SourceReader(数据读取器)。

  2. 运行时协作

    • 分片分配:SourceCoordinator 的 SplitEnumerator 通过 RPC 响应 SourceOperator 的分片请求(如AddSplitEvent),动态分配分片(Split)。

    • 数据读取:SourceOperator 将分配到的分片交给内部的 SourceReader,通过pollNext()方法读取数据并发送到下游。

SourceOperator类逻辑

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>implements OperatorEventHandler,PushingAsyncDataInput<OUT>,TimestampsAndWatermarks.WatermarkUpdateListener {
​/** The state that holds the currently assigned splits. */// 状态存储当前被分配的分片信息private ListState<SplitT> readerState;@Overridepublic void open() throws Exception {// 初始化Reader操作initReader();
​// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval());} else {eventTimeLogic =TimestampsAndWatermarks.createNoOpEventTimeLogic(watermarkStrategy, sourceMetricGroup);}
​// restore the state if necessary.// 从checkpoint状态中恢复出上一次被分配的分片信息final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());if (!splits.isEmpty()) {LOG.info("Restoring state for {} split(s) to reader.", splits.size());// 然后把分片信息添加到Reader中sourceReader.addSplits(splits);}
​// Register the reader to the coordinator.registerReader();
​sourceMetricGroup.idlingStarted();// Start the reader after registration, sending messages in start is allowed.sourceReader.start();
​eventTimeLogic.startPeriodicWatermarkEmits();}// SourceOperator处理算子的对应事件public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {// 处理新增分片的事件:对应任务第一次消费,或者有心的分片增加了(对应到kafka中就是分区数增加了)handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {if (eventTimeLogic != null) {eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());}output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {List<SplitT> newSplits = event.splits(splitSerializer);numSplits += newSplits.size();if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {// For splits arrived before the main output is initialized, store them into the// pending list. Outputs of these splits will be created once the main output is// ready.outputPendingSplits.addAll(newSplits);} else {// Create output directly for new splits if the main output is already initialized.createOutputForSplits(newSplits);}// 将新增的分片信息添加到reader中。sourceReader.addSplits(newSplits);} catch (IOException e) {throw new FlinkRuntimeException("Failed to deserialize the splits.", e);}}
}

以上可以看到在SourceOperator中,SourceReader新增分片的地方有两个:Open()函数中从checkpoint中恢复的和handleAddSplitsEvent()中添加的分片信息,然后继续看看sourceReader.addSplits(newSplits)中调用的是FlinkSourceReaderBase#addSplits(newSplits)方法。

由于Beam中kafka的FlinkSourceReader分别对应有界和无界,所以中间有一个抽象的类FlinkSourceReaderBase

FlinkSourceReaderBase类

public abstract class FlinkSourceReaderBase<T, OutputT>implements SourceReader<OutputT, FlinkSourceSplit<T>> {// 这是一个队列,存储的是分片信息 private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();@Overridepublic void addSplits(List<FlinkSourceSplit<T>> splits) {checkExceptionAndMaybeThrow();LOG.info("Adding splits {}", splits);// 往队列中添加了分片信息sourceSplits.addAll(splits);waitingForSplitChangeFuture.get().complete(null);}protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {// 从队列中消费分片 FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();if (sourceSplit != null) {// 然后根据分片创建对应的Reader,进行消费Kafka的数据。Source.Reader<T> reader = createReader(sourceSplit);ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);return Optional.of(readerAndOutput);}return Optional.empty();}
}

所以看到以上的代码其实很清楚了,消费kafka重复很有可能是因为分片被重复添加导致的,由于在Kafka中KafkaConsumer在指定分区和Offset的情况下,是可以多个消费者在同一个消费者组中消费同一个分区的。

接下来使用arthas去监控sourceReader.addSplits(newSplits)的地方的调用情况:

// 监控SourceOperator#open()方法
watch org.apache.flink.util.CollectionUtil iterableToList '{params,returnObj,throwExp}'  -n 5  -x 3 
​
// 监控SourceOperator#handleAddSplitsEvent()方法
watch org.apache.flink.streaming.api.operators.SourceOperator handleAddSplitsEvent '{params,returnObj,throwExp}'  -n 5  -x 3 

最终观察到这两个地方都被调用了,所以问题就是因为checkpoint恢复的时候添加了分片信息,而从SourceCoordinator中调用FlinkSourceSplitEnumerator()计算分片的地方又添加了一次导致最终kafka消费重复了。

FlinkSourceSplitEnumerator类

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;private final Source<T> beamSource;private final PipelineOptions pipelineOptions;private final int numSplits;private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;// 这里标识split计算是否被初始化过private boolean splitsInitialized;  public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits) {this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);// 这里看到永远都是false,所以无论有没有从checkpoint恢复过,这里都会执行过一次。 this.splitsInitialized = false;}@Overridepublic void start() {context.callAsync(() -> {// 执行分片计算的操作,计算哪些kafka分区被分配给哪个并行度try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);// 这里标识设置为true了 splitsInitialized = true;// 将分配好的分片信息通过rpc发送给SourceOpeartor,对应并行度的task取对应并行度的分片信息。sendPendingSplitsToSourceReaders();}});}}

以上看到FlinkSourceSplitEnumerator被初始化的时候splitsInitialized被设置为false,然后接着看实例化FlinkSourceSplitEnumerator的FlinkSource中的逻辑。

public abstract class FlinkSource<T, OutputT>implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {// 这里是没有checkpoint的时候执行的 @Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);}
​// 这里是从checkppoint中恢复的地方@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 在这里实例化了FlinkSourceSplitEnumeratorFlinkSourceSplitEnumerator<T> enumerator =new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}}

以上看到在实例化FlinkSourceSplitEnumerator的地方,只要是从checkpoint中恢复的时候,将标识splitsInitialized设置为true,那么就不会从checkpoint中恢复的时候,去重复计算和添加分片从而导致重复消费了。

3.问题解决

后来在Beam的2.64.0版本中,发现这个bug已经被修复了,FlinkSource中restoreEnumerator的地方已经加上了判断逻辑了。

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
​@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 这里将splitInitialized标识设置为了trueSplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =createEnumerator(enumContext, true);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)throws Exception {
​if (boundedness == Boundedness.BOUNDED) {return new LazyFlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);} else {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);}}}
​
​
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits,boolean splitsInitialized) {
​this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);this.splitsInitialized = splitsInitialized;}
​@Overridepublic void start() {// 这里加上了判断逻辑了,为true不会执行了if (!splitsInitialized) {initializeSplits();}}
​private void initializeSplits() {context.callAsync(() -> {try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);splitsInitialized = true;sendPendingSplitsToSourceReaders();}});}
}

4.其他问题

从上可以看到Beam的KafkaSource实际上对比Flink原生的KafkaSource其实还有很多功能上的不足,比如说:

1.Beam中KafkaSource当从checkpoint恢复任务时,且这时候手动增加了Kafka的分区数实际上是不会被消费到的。

2.Beam中KafkaSource没有动态分区发现的功能,既不能在不手动重启任务且不从checkpoint恢复的情况下下消费到新分区的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87074.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87074.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87074.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于 java:9. Java 网络编程

一、Socket 编程 Socket&#xff08;套接字&#xff09;是网络通信的端点&#xff0c;是对 TCP/IP 协议的编程抽象&#xff0c;用于实现两台主机间的数据交换。 通俗来说&#xff1a; 可以把 Socket 理解为“电话插口”&#xff0c;插上后客户端和服务端才能“通话”。 Sock…

主流零信任安全产品深度介绍

腾讯 iOA 零信任安全管理系统 功能&#xff1a;提供零信任接入、终端安全、数据防泄密等十余种功能模块。可实现基于身份的动态访问控制、终端安全一体化防护、数据防泄密体系等。核心优势&#xff1a;基于腾讯内部千万级终端实践打磨&#xff0c;沉淀丰富场景方案&#xff0c…

LabVIEW装配车体挠度无线测量

针对轨道交通车辆装配过程中车体挠度测量需求&#xff0c;基于LabVIEW开发无线快速测量系统&#xff0c;采用品牌硬件构建高精度数据采集与传输架构。系统通过 ZigBee 无线传输技术、高精度模数转换模块及激光位移传感器&#xff0c;实现装配车体挠度的实时、自动、非接触测量&…

java微服务-linux单机CPU接近100%优化

你这个场景&#xff1a; 4核16G 机器 同时运行了 8个 Spring Boot 微服务&#xff0c;每个 JAR 文件 100多 MB 导致 CPU 接近100% 确实是一个常见但资源紧绷的部署情境。下面是分层的优化建议&#xff0c;包括 JVM、系统、服务架构等多个方面&#xff0c;帮助你 降 CPU、稳…

MySQL表的约束和基本查询

一.表的约束 1.1空属性 当我们填写问卷的时候,经常会有不允许为空的问题,比如电话号,姓名等等.而mysql上我们可以在创建表的时候,如果想要某一列不允许为空,可以加上not null来加以限制: mysql> create table myclass( -> class_name varchar(20) not null, -> cla…

VBA代码解决方案第二十六讲:如何新建EXCEL工作簿文件

《VBA代码解决方案》(版权10028096)这套教程是我最早推出的教程&#xff0c;目前已经是第三版修订了。这套教程定位于入门后的提高&#xff0c;在学习这套教程过程中&#xff0c;侧重点是要理解及掌握我的“积木编程”思想。要灵活运用教程中的实例像搭积木一样把自己喜欢的代码…

【unity游戏开发——网络】套接字Socket的重要API

注意&#xff1a;考虑到热更新的内容比较多&#xff0c;我将热更新的内容分开&#xff0c;并全部整合放在【unity游戏开发——网络】专栏里&#xff0c;感兴趣的小伙伴可以前往逐一查看学习。 文章目录 1、Socket套接字的作用2、Socket类型与创建3、核心属性速查表4、关键方法指…

计算机网络(二)应用层HTTP协议

目录 1、HTTP概念 ​编辑2、工作流程​​ 3、HTTP vs HTTPS​​ 4、HTTP请求特征总结​ 5、持久性和非持久性连接 非持久连接&#xff08;HTTP/1.0&#xff09;​​ ​​持久连接&#xff08;HTTP/1.1&#xff09;​​ 1、HTTP概念 HTTP&#xff08;HyperText Transfer …

c# IO密集型与CPU密集型任务详解,以及在异步编程中的使用示例

文章目录 IO密集型与CPU密集型任务详解&#xff08;C#示例&#xff09;一、基本概念1. IO密集型任务2. CPU密集型任务 二、C#示例1. IO密集型示例1.1 文件操作异步示例1.2 网络请求异步示例1.3 数据库操作异步示例 2. CPU密集型示例2.1 基本CPU密集型异步处理2.2 并行处理CPU密…

用lines_gauss的width属性提取缺陷

自己做了一个图&#xff0c;这个图放在资源里了 结果图是这样&#xff08;这里只结算了窄区&#xff09; 代码和备注如下 read_image (Image11, C:/Users/Administrator/Desktop/分享/15/11.png) rgb1_to_gray (Image11, GrayImage) invert_image (GrayImage, ImageInvert) thr…

从0到100:房产中介小程序开发笔记(中)

背景调研 为中介带来诸多优势&#xff0c;能借助它打造专属小程序&#xff0c;方便及时更新核实租赁信息&#xff0c;确保信息准确无误&#xff0c;像房屋的大致地址、租金数额、租赁条件、房源优缺点等关键信息都能清晰呈现。还可上传房屋拍摄照片&#xff0c;这样用户能提前…

【AI 时代的网络爬虫新形态与防护思路研究】

网络爬虫原理与攻击防护的深度研究报告 网络爬虫技术已进入AI驱动的4.0时代&#xff0c;全球自动化请求流量占比突破51%&#xff0c;传统防御手段在面对高度仿真的AI爬虫时已显疲态。基于2025年最新数据&#xff0c;深入剖析网络爬虫的基本原理、工作流程、分类与攻击方式&…

低代码平台架构设计与关键组件

低代码平台的架构设计是其核心能力的关键支撑&#xff0c;需要平衡可视化开发的便捷性、生成应用的健壮性与性能、可扩展性以及企业级需求&#xff08;如安全、多租户、集成&#xff09;。以下是一个典型的企业级低代码平台架构概览及其关键组件&#xff1a; https://example.…

电商 ERP 系统集成接口指南

电商 ERP 系统的高效运行依赖于与多个业务系统的无缝对接&#xff0c;需要集成的核心接口包括&#xff1a;商品管理、订单处理、库存同步、物流配送、客户管理、财务结算等。这些接口是实现数据互通、业务协同的关键桥梁。 一、电商 ERP 系统集成所需接口类型 &#xff08;一…

Python实现对WPS协作群进行群消息自动推送

前言 本文是该专栏的第59篇,后面会持续分享python的各种干货知识,值得关注。 相信有些同学在工作或者项目中,都会使用到“WPS协作”作为办公聊天软件。如果说,有些项目的监控预警正好需要你同步到WPS协作群,这个时候需要怎么去做呢? 而本文,笔者将基于WPS协作,通过Py…

js严格模式和非严格模式

好的&#xff0c;这是一个非常基础且重要的概念。我们来详细解析一下 JavaScript 中的严格模式&#xff08;Strict Mode&#xff09;和非严格模式&#xff08;Sloppy Mode&#xff09;。 可以把它想象成参加一场考试&#xff1a; 非严格模式&#xff1a;就像是开卷、不计时的…

板凳-------Mysql cookbook学习 (十一--------1)

第11章&#xff1a;生成和使用序列 11.0 引言 11.1 创建一个序列列并生成序列值 CREATE TABLE insect ( id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)&#xff0c;name VARCHAR(30) NOT NULL,date DATE NOT NULL,origin VARCHAR(30) NOT NULL); 字段说明 ‌id…

Vue3 中 Excel 导出的性能优化与实战指南

文章目录 Vue3 中 Excel 导出的性能优化与实战指南引言&#xff1a;为什么你的导出功能会卡死浏览器&#xff1f;一、前端导出方案深度剖析1.1 xlsx (SheetJS) - 轻量级冠军1.2 exceljs - 功能强大的重量级选手 二、后端导出方案&#xff1a;大数据处理的救星2.1 为什么大数据需…

安卓RecyclerView实现3D滑动轮播效果全流程实战

安卓RecyclerView实现3D滑动轮播效果全流程实战 1. 前言 作为一名学习安卓的人,在接触之前和之后两种完全不同的想法: 好看和怎么实现 当初接触到RecyclerView就觉得这个控件就可以把关于列表的所有UI实现,即便不能,也是功能十分强大 放在现在依然是应用最广的滑动列表控…

电机控制——电机位置传感器零位标定

在有感FOC算法中电机位置是一个重要的输入&#xff0c;电机位置传感器的作用就是测量电机的旋转角度&#xff0c;通常是输出sin(Theta)和cos(Theta)两路模拟信号&#xff0c;根据这两路模拟信号测得电机旋转绝对角度。注意传感器测量的是机械角度&#xff0c;不是电角度。 关于…