背景

最近在做 Flink 任务数据源切换时遇到 offset 消费问题,遂写篇文章记录下来。

切换时只修改了 source 算子的 topic,uid 等其他信息保持不变:

  1. 发布时,发现算子的消费者点位重置为earliest,导致消息积压。
  2. 消息积压后,打算通过时间戳重置点位到发布前,但是发现点位重置失效。

原因分析

source算子点位初始化模式

source算子点位初始化有两种方式:1)消费者组偏移量:setStartFromGroupOffsets;2)时间戳:setStartFromTimestamp。

消费组偏移量(FromGroupOffsets)

该方式会将 startupMode 初始化为 StartupMode.GROUP_OFFSETS:

startupMode枚举:

时间戳(FromTimestamp)

该方式会将 startupMode 初始化为 StartupMode.TIMESTAMP:

source 算子初始化

示例代码:

public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");configuration.setString("execution.savepoint.path", "xxx");configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 启用checkpointenv.enableCheckpointing(5000);env.setParallelism(1);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);ParameterTool argTools = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(argTools);// 添加数据源// "old_topic", "new_topic"FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();consumer.setStartFromGroupOffsets();DataStream<String> stream = env.addSource(consumer).uid("kafka-source").name("kafka-source");SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {@Overridepublic HeartEntity map(String value) throws Exception {HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);return heartEntity;}}).uid("map-heart").name("map-heart");// 使用状态计数DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo).map(new RichMapFunction<HeartEntity, Long>() {private transient ValueState<Long> countState;private long count = 0;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(HeartEntity value) throws Exception {count++;countState.update(count);return countState.value();}}).uid("count-map").name("count-map");// 打印计数结果countStream.print().uid("print").name("print");// 启动Flink任务env.execute("Flink Kafka State Count Example");}
从状态启动
initializeState

状态初始化时,FlinkKafkaConsumerBase 执行 initializeState 方法中:当 source topic 从 sg_lock_heart_msg_topic 切换为 sg_tw_common_com_lock_heart_report_topic 时,可以看到新 topic 绑定的 source 算子仍然是从老 topic 的算子状态启动的,因为 uid 没变。

initializeState 往下走可以看到,restoreState 的是老 topic 分区的状态;

open

算子初始化时,如果状态不为空且 topic 分区不在状态中,那么就会把新的 topic 分区加入到状态中,并设置算子消费新分区的 startupMode 为 EARLIEST_OFFSET,即从最早的消息开始消费。

老的 topic 分区不会再消费,会被移除订阅。

订阅的 topic 分区

从指定时间戳启动

setStartFromTimestamp 设置启动模式为时间戳

然而在算子初始化时,由于从状态启动,新 topic分区 仍然会从 earliest 消费:

也就是说,checkpoint/savepoint 中存储的 source 点位状态在恢复时大于设置的时间戳。

解决方案

尝试一(修改 uid)

从 source 算子初始化的 open 过程可知,既然从状态启动时会将已存在 source 算子(uid在状态中)的新 topic 点位设置为最早,那么如果将新 topic 的 uid 改成与老 topic 的 uid 不一致,是否就能避免从 earliest 恢复:因为从状态恢复时新的 uid 并不在状态中,那么就不会走 open 中将新 topic 点位置为 earliest 的流程。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");

可以看到在状态初始化阶段(initializeState),source 算子的状态 (restoreState)被置为空集合,而不是 null。为什么?

当在算子初始化时,因为 restoreState 不为 null,仍然会进入点位重置的流程:

可以看到这里将新 topic 分区放入了 restoreState 中,且点位置为 earliest(StarupMode 枚举中,EARLIEST_OFFSET = -915623761775L)。

再往下走,restoreState 会将其中的新 topic 分区放入订阅的分区中

从此,新 topic 又从最早开始消费😓。那么方案尝试一是失败的!

在线上实际操作时,消费点位确实被重置到了 earliest,又导致积压了😦。

尝试二(修改消费者组)

有没有办法让 restoreState 置为 null 呢,那就真的不会走到点位重置的流程了🎊

突然看到 restoreState 的注释:

如果消费者从状态恢复,就会设置 restoreState。那怎么让消费者不从状态恢复?无状态启动肯定是不行的,不能让其他算子的状态丢了。那我直接换个消费组名!试一试呢

Properties props = new Properties();props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");props.put("group.id", "flink-label-engine-new");

还是不行,直到目前发现只要从状态启动,context 上下文会让代码走进给 restoreState 赋值的位置。

isRestored分析

isRestore分析

尝试三(新增拓扑图)

根据算子状态恢复可知,只要新增的 source 算子跟其他已有算子形成了算子链,如果以状态启动,那么 source 的点位就会被置为 earliest。

  1. 新增一个新 topic 的 source 算子和 sink 算子(要保证新增的算子与已有算子隔离,不会形成算子链),然后修改老 source 算子的 uid 和 topic 与新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new).uid("kafka-source-new").name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");

由于从状态启动,且新加入的算子与其他算子隔离,老 source 算子的点位从状态启动;新 source 算子的点位被置为 GROUP_OFFSET。

1. 暂停并保存状态;

2. 修改老 source 算子的 uid 和 topic 与 新算子保持一致,同时删除新算子;

3. 然后从状态启动(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子点位从状态恢复:

4. 下游 “count-map”的状态是否正常:发送测试消息,可以看出状态没丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87654.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87654.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87654.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何录制带备注的演示文稿(LaTex Beamer + Pympress)

参考文献&#xff1a; Pympress 官网Avidemux 官网Audacity 官网FFmpeg 官网2025年度25大视频剪辑软件推荐2025最新音频降噪软件盘点&#xff0c;从入门到专业的6个高效工具如何用一段音频替换mp4视频格式的原有音频&#xff1f;免费简单易用的视频剪切编辑工具—AvidemuxFFmp…

VS Code 的 Copilot Chat 扩展程序

安装与启用 Copilot Chat 扩展 在 VS Code 中打开扩展市场&#xff08;快捷键 CtrlShiftX 或点击左侧活动栏的扩展图标&#xff09;。搜索“GitHub Copilot Chat”&#xff0c;点击安装。安装完成后需登录 GitHub 账户并授权 Copilot 权限。确保已订阅 GitHub Copilot 服务&am…

bash 脚本比较 100 个程序运行时间,精确到毫秒,脚本

脚本如下&#xff1a; #!/bin/bash# 设置测试次数 NUM_TESTS100 # 设置要测试的程序路径 PROGRAM"./your_program" # 替换为你的程序路径 # 设置程序参数&#xff08;如果没有参数则留空&#xff09; ARGS"" # 例如: "input.txt output.txt"#…

【Linux学习】Linux安装并配置Redis

安装Redis在Linux系统上安装Redis可以通过包管理器或源码编译两种方式进行。以下是两种方法的详细步骤。使用包管理器安装Redis&#xff08;以Ubuntu为例&#xff09;&#xff1a;sudo apt update sudo apt install redis-server通过源码编译安装Redis&#xff1a;wget https:/…

redis每种数据结构对应的底层数据结构原理

Redis 的每种数据结构(String、List、Hash、Set、Sorted Set)在底层都采用了不同的实现方式,根据数据规模和特性动态选择最优的编码(encoding)以节省内存和提高性能。以下是详细原理分析: 1. String(字符串) 底层实现: int:当存储整数值且可用 long 表示时,直接使用…

WPF控件大全:核心属性详解

WPF常用控件及核心属性 以下是WPF开发中最常用的控件及其关键属性&#xff08;按功能分类&#xff09;&#xff1a; 基础布局控件 Grid&#xff08;网格布局&#xff09; RowDefinitions&#xff1a;行定义集合&#xff08;如Height"Auto"&#xff09;ColumnDefinit…

马斯克脑机接口(Neuralink)技术进展,已经实现瘫痪患者通过BCI控制电脑、玩视频游戏、学习编程,未来盲人也能恢复视力了

目录 图片总结文字版总结1. 核心目标与愿景1.1 增强人类能力1.2 解决脑部疾病1.3 理解意识1.4 应对AI风险 2. 技术进展与产品2.1 Telepathy&#xff08;意念操控&#xff09;功能与目标技术细节参与者案例 2.2 Blindsight&#xff08;视觉恢复&#xff09;**功能与目标**技术细…

Vuex身份认证

虽说上一节我们实现了登录功能&#xff0c;但是实际上还是可以通过浏览器的地址来跳过登录访问到后台&#xff0c;这种可有可无的登录功能使得系统没有安全性&#xff0c;而且没有意义 为了让登录这个功能有意义&#xff0c;我们应该&#xff1a; 应当在用户登录成功之后给用户…

springboot中使用线程池

1.什么场景下使用线程池&#xff1f; 在异步的场景下&#xff0c;可以使用线程池 不需要同步等待&#xff0c; 不需要管上一个方法是否执行完毕&#xff0c;你当前的方法就可以立即执行 我们来模拟一下&#xff0c;在一个方法里面执行3个子任务&#xff0c;不需要相互等待 …

Flask+LayUI开发手记(十):构建统一的选项集合服务

作为前端最主要的组件&#xff0c;无论是layui-table表格还是layui-form表单&#xff0c;其中都涉及到选项列的处理。如果是普通编程&#xff0c;一个任务对应一个程序&#xff0c;自然可以就事论事地单对单处理&#xff0c;前后端都配制好选项&#xff0c;手工保证两者的一致性…

redis的数据初始化或增量更新的方法

做系统开发的时候&#xff0c;经常需要切换环境&#xff0c;做一些数据的初始化的工作&#xff0c;而redis的初始化&#xff0c;假如通过命令来执行&#xff0c;又太复杂&#xff0c;因为redis有很多种数据类型&#xff0c;全部通过敲击命令来初始化的话&#xff0c;打的命令实…

【PaddleOCR】OCR表格识别数据集介绍,包含PubTabNet、好未来表格识别、WTW中文场景表格等数据,持续更新中......

&#x1f9d1; 博主简介&#xff1a;曾任某智慧城市类企业算法总监&#xff0c;目前在美国市场的物流公司从事高级算法工程师一职&#xff0c;深耕人工智能领域&#xff0c;精通python数据挖掘、可视化、机器学习等&#xff0c;发表过AI相关的专利并多次在AI类比赛中获奖。CSDN…

sparkjar任务运行

mainclass&#xff1a; test.sparkjar.SparkJarTest

Web攻防-文件下载文件读取文件删除目录遍历路径穿越

知识点&#xff1a; 1、WEB攻防-文件下载&读取&删除-功能点&URL 2、WEB攻防-目录遍历&穿越-功能点&URL 黑盒分析&#xff1a; 1、功能点 文件上传&#xff0c;文件下载&#xff0c;文件删除&#xff0c;文件管理器等地方 2、URL特征 文件名&#xff1a; d…

使用LIMIT + OFFSET 分页时,数据重复的风险

在使用 LIMIT OFFSET 分页时&#xff0c;数据重复的风险不仅与排序字段的唯一性有关&#xff0c;还与数据变动&#xff08;插入、删除、更新&#xff09;密切相关。以下是详细分析&#xff1a; 一、数据变动如何导致分页异常 1. 插入新数据 场景&#xff1a;用户在浏览第 1 页…

Excel 数据透视表不够用时,如何处理来自多个数据源的数据?

当数据透视表感到“吃力”时&#xff0c;我们该怎么办&#xff1a; 数据量巨大&#xff1a;Excel工作表有104万行的限制&#xff0c;当有几十万行数据时&#xff0c;透视表和公式就会变得非常卡顿。数据来源多样&#xff1a;数据分散在多个Excel文件、CSV文件、数据库甚至网页…

cf(1034)Div3(补题A B C D E F)

哈&#xff0c;这个比赛在开了不久之后&#xff0c;不知道为啥卡了差不多20来分钟&#xff0c;后面卡着卡着就想睡觉了。实在是太困了.... 题目意思&#xff1a; Alice做一次操作&#xff0c;删除任意数字a,而Bob做一次操作删除b使得ab对4取余是3。 获胜条件&#xff0c;有人…

浏览器与服务器的交互

浏览器地址栏输入URL&#xff08;网址​​&#xff09; ​​​​(1) 服务器进行URL解析​​&#xff1a;验证URL格式&#xff0c;提取协议、域名等 ​​​​(2) 服务器进行DNS查询​​&#xff1a;将域名转换为IP地址&#xff08;可能涉及缓存或DNS预取&#xff09; ​​​​…

Spring Boot中POST请求参数校验的实战指南

在现代的Web开发中&#xff0c;数据校验是确保应用程序稳定性和安全性的关键环节。Spring Boot提供了强大而灵活的校验机制&#xff0c;能够帮助开发者轻松地对POST请求参数进行校验。本文将详细介绍如何在Spring Boot中实现POST请求参数的校验&#xff0c;并通过具体的代码示例…

Spring Boot + MyBatis/MyBatis Plus:XML中循环处理List参数的终极指南

重要提醒&#xff1a;使用Param注解时&#xff0c;务必导入正确的包&#xff01; import org.apache.ibatis.annotations.Param; 很多开发者容易错误导入Spring的Param&#xff0c;导致参数绑定失败&#xff01; 一、为什么需要传递List参数&#xff1f; 最常见的场景是动态构…