背景
修改 source 算子

kafka_old_topic 消费任务运行一段时间后,暂停状态并保留。然后将 uid 和 topic 都改了,消费者 offset 会从 earliest 开始。

// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
新增 source 算子

但是只新增一个同样的 kafka-source-new 算子(old 保留,消费者 offset 却会从最近开始。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
算子(链)子任务状态列表(operatorSubtaskStates)

针对第一种情况,job 的算子状态(localStates)有三个,分别对应xxx,

当给 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配状态时,该 Task 的每个算子都会绑定一个状态(OperatorState):“kafka-source-new”、“map-heart”,只不过这两个 OperatorState 有点差异:

这两个算子状态的 operatorSubtaskStates (存储算子子任务的状态信息)集合一个为空,一个不为空。原因就是在分配 “kafka-source-new” 算子状态时,由于其不在 localState,于是走了默认的构造函数创建 OperatorState 对象:

其实关键点就在 operatorSubtaskStates 的封装。

TaskStateAssignment 任务状态分配

TaskStateAssignment 的构造方法有个核心参数 hasNonFinishedState。

如果当前 Task 的子任务状态列表(operatorSubtaskStates全集)不为空,该值就为 true。

一旦该值为 true,就会执行 assignTaskStateToExecutionJobVertices:

给当前 Task 的每个 subTask 赋值状态:

那么每个 subTask 都会有一份状态(JobManagerTaskRestore,绑定 checkpointId):

JobManagerTaskRestore(JM与TM状态交互中间站)

一个 Execution 就是一个 subTask:

Task 部署阶段(JM 向 TM 提交 Task 任务),TM 会根据 TaskDeploymentDescriptor 来恢复状态和创建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中赋值)。

TM 接收到提交任务请求时,解析出 taskRestore 创建任务状态管理器(TaskStateManager)

TaskStateManager(TM 的任务管理器)
算子子任务状态获取

prioritizedOperatorState:传入算子 ID,即可从 JobMangerTaskRestore 获取子任务状态。

  1. 如果 JobMangerTaskRestore 为 null,那么返回一个空的 PrioritizedOperatorSubtaskState(checkpoint设置为null

  1. 如果不为 null,则会从 JobManagerTaskRestore 中根据算子ID封装 PrioritizedOperatorSubtaskState。

StateInitializationContext(UDF-算子状态初始化上下文)

KafkaConsumerBase 在初始化状态阶段,会调用context.isRestored()判断是否从状态恢复:

算子状态句柄(StreamOperatorStateHandler)处理算子状态的初始化,该阶段会调用 UDFKafkaConsumerBase.initializeState初始化算子的本地状态并且 checkpointId 就是在这里被写入状态上下文 StateInitializationContext(该上下文是可以被用户访问的)。

StreamOperatorStateContext(initializeState全局上下文)

以上得知 checkpointId 来自context.getRestoredCheckpointId,那么 context (该上下文是不可以被用户访问的)从何而来?

算子状态初始化AbstractStreamOperator.initialState,利用 StreamTaskStateInitializer

封装 StreamOperatorStateContext:

那么 checkpointId 的封装肯定在streamTaskStateManger.streamOperatorStateContext中:

方法中通过 taskStateManger 封装算子状态,如果 prioritizedOperatorSubtaskState 为空对象,那么这里的 checkpointId 就为 null


针对第二种情况,task 的算子状态(有多个算子,算子链)不存在 localOperators 中,则默认使用构造方法封装 OperatorState,每个OperatorState 的 operatorSubtaskStates 集合都为空。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913777.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913777.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913777.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA中application.yml配置文件不自动提示解决办法

今天在自己的电脑上使用IDEA的时候&#xff0c;发现在application配置文件里面输入配置项的时候没有提示&#xff0c;网上找了一圈也没解决&#xff0c;最后自己试出来了。 解决办法&#xff1a; 鼠标移动到配置文件上&#xff0c;单击右键-重写文件类型、选择YAML(捆绑)&#…

Vite 完整功能详解与 Vue 项目实战指南

Vite 完整功能详解与 Vue 项目实战指南 Vite 是下一代前端开发工具&#xff0c;由 Vue 作者尤雨溪开发&#xff0c;提供极速的开发体验和高效的生产构建。以下是完整功能解析和实战示例&#xff1a;一、Vite 核心功能亮点闪电般冷启动 基于原生 ES 模块&#xff08;ESM&#xf…

Vue 3 中使用路由参数跳转时 watch 触发重复请求问题详解

&#x1f4d8;Vue 3 中使用路由参数跳转时 watch 触发重复请求问题详解&#x1f516; 收藏 点赞 关注&#xff0c;掌握 Vue 3 路由参数监听中的隐藏陷阱&#xff0c;避免详情页、嵌套路由页误触发重复请求&#xff01;&#x1f9e9; 一、问题背景 在 Vue 3 项目中&#xff0c…

前端 项目更新通知 (plugin-web-update-notification)

项目版本更新迭代时&#xff0c;需提示用户更新系统&#xff0c;不然早时间不更新对用户体验很不好&#xff0c;所以在每次部署后需要提示用户&#xff0c;刷新静态资源。推荐插件 plugin-web-update-notification .具体配置 vite.config.js文件中 import { webUpdateNotice …

【力扣(LeetCode)】数据挖掘面试题0002:当面对实时数据流时您如何设计和实现机器学习模型?

文章大纲一、实时数据处理&#xff1a;构建低延迟的数据管道1. 数据接入与缓冲2. 实时清洗与校验3. 特征标准化与对齐二、模型设计&#xff1a;选择适配实时场景的模型架构1. 模型选择原则三、训练与更新策略&#xff1a;离线与在线协同&#xff0c;应对概念漂移1. 离线-在线协…

TongWeb8.0.9.0.3部署后端应用,前端访问后端报405(by sy+lqw)

问题描述&#xff1a; 客户前端部署在nginx上&#xff0c;后端部署在tongweb8上&#xff08;相当于前后端分离&#xff09;&#xff0c;登录的时候报错&#xff0c;f12看network&#xff0c;状态码405&#xff0c;如下所示&#xff1a;看console&#xff0c;如下所示&#xff1…

mysql互为主从失效,重新同步

一、分别登录服务器A和服务器B的mysqlmysql -u root -p 123456789二、分别查看数据库状态信息,下边两项参数有一项为NO就表示同步异常Slave_IO_Running:从服务器&#xff08;Slave&#xff09;中的 I/O 线程的运行状态Slave_SQL_Running:从服务器上的 SQL 线程是否正在运行mysq…

板凳-------Mysql cookbook学习 (十一--------6)

https://blog.csdn.net/weixin_43236925/article/details/146382981 清晰易懂的 PHP 安装与配置教程 12.6 查找每组行中含有最大或最小值的行 mysql> set max_price (select max(price) from painting); Query OK, 0 rows affected (0.01 sec)mysql> select artist.name…

ECS由浅入深第四节:ECS 与 Unity 传统开发模式的结合?混合架构的艺术

ECS由浅入深第一节 ECS由浅入深第二节 ECS由浅入深第三节 ECS由浅入深第四节 ECS由浅入深第五节 尽管 ECS 带来了显著的性能和架构优势&#xff0c;但在实际的 Unity 项目中&#xff0c;完全摒弃 GameObject 和 MonoBehaviour 往往是不现实的。Unity 引擎本身的大部分功能&…

Mac关闭触控板

打开 “有鼠标或无线触控板时忽略内建触控板”选项即可 参考&#xff1a;Mac如何关闭触控板防止误触&#xff1f;内置的设置就可以达成 - Mac天空

Python:Rich 终端富文本与界面样式工具库

🖌️ 1、简述 Rich 是一个强大的 Python 库,用于在终端中呈现富文本和精美的格式,让命令行界面(CLI)应用拥有现代、美观的输出效果。本文将深入介绍 Rich 的核心功能,并通过一系列实际示例展示其强大能力。 Rich 由 Will McGugan 开发,主要特点包括: 丰富的文本样式:支…

深入解析享元模式:通过共享技术高效支持大量细粒度对象

深入解析享元模式&#xff1a;通过共享技术高效支持大量细粒度对象 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 总有一行代码&#xff0c;能点亮万千星辰。 &#x1f50d; 在技术的宇宙中&#xff0c;我愿做永不停歇的探索者。 ✨ 用代码丈量世…

Docker高级管理

一、Docker 容器的网络模式 当项目大规模使用 Docker 时&#xff0c;容器通信的问题也就产生了。要解决容器通信问题&#xff0c;必须先了解很多关于网络的知识。Docker 的网络模式非常丰富&#xff0c;可以满足不同容器的通信要求&#xff0c;下表列出了这些网络模式的主要信息…

ABP VNext + Tye:本地微服务编排与调试

ABP VNext Tye&#xff1a;本地微服务编排与调试 &#x1f680; &#x1f4da; 目录ABP VNext Tye&#xff1a;本地微服务编排与调试 &#x1f680;TL;DR ✨一、环境与依赖 &#x1f6e0;️二、核心配置详解 &#x1f680;1. 主配置 tye.yaml三、多环境文件 &#x1f331;&am…

Vue响应式原理一:认识响应式逻辑

核心思想&#xff1a;当数据发生变化时&#xff0c;依赖该数据的代码能够自动重新执行Vue中的应用&#xff1a;在data或ref/reactive中定义的数据&#xff0c;当数据变化时template会自动更新template的本质&#xff1a; 是render()函数, 用变化之后的数据重新执行render()函数…

Redis:分组与设备在 Redis 中缓存存储设计

一、缓存存储结构设计 分组与设备的映射关系&#xff08;使用 Set 结构&#xff09;&#xff1a; 键格式&#xff1a;采用 group:{groupId}:devices 的格式作为 Redis 中 Set 的键&#xff0c;例如 group:1:devices 就代表了分组 ID 为 1 的分组所关联的设备集合。值内容&#…

Leetcode 3605. Minimum Stability Factor of Array

Leetcode 3605. Minimum Stability Factor of Array 1. 解题思路2. 代码实现 题目链接&#xff1a;3605. Minimum Stability Factor of Array 1. 解题思路 这一题的核心思路是二分法&#xff0c;本质上就是我们给定一个常数kkk&#xff0c;然后考察是否存在一个构造使得能够…

编译安装的Mysql5.7报“Couldn‘t find MySQL server (mysqld_safe)“的原因 笔记250709

编译安装的Mysql5.7报"Couldn’t find MySQL server (mysqld_safe)"的原因 笔记250709 MySQL 的安装路径与配置文件&#xff08;如 my.cnf 或 mysql.server&#xff09;中指定的 basedir 不一致。 mysqld_safe 文件实际位置与系统查找路径不匹配&#xff08;常见于自…

在 Ubuntu 下配置 oh-my-posh —— 普通用户 + root 各自使用独立主题(共享可执行)

&#x1f9e9; 在 Ubuntu 下配置 oh-my-posh —— 普通用户 root 各自使用独立主题&#xff08;共享可执行&#xff09;✅ 目标说明普通用户 使用 tokyonight_storm 主题 root 用户 使用 1_shell 主题 共用全局路径下的 oh-my-posh 可执行文件 正确加载 Homebrew 到环境变量中…

Spring Boot 项目中的多数据源配置

关键词&#xff1a;Spring Boot、多数据源配置、MySQL、SQL Server、Oracle、动态切换 ✅ 摘要 在实际企业级开发中&#xff0c;一个 Spring Boot 项目可能需要连接多个数据库&#xff0c;比如 MySQL、SQL Server 和 Oracle。不同的业务模块可能依赖不同的数据源&#xff0c;这…