在大数据处理领域,Flink SQL凭借其强大的处理能力和易用性,成为众多开发者的选择。与其他OLAP引擎类似,Flink SQL的SQL执行流程大致都需要经过词法解析、语法解析、生成抽象语法树(AST)、校验以及生成逻辑执行计划等步骤。整体流程可笼统地概括为两大阶段:从SQL到Operation的转换,再从Operation到Transformation的转换,最终进入分布式执行阶段。接下来,我们将以INSERT INTO语句为例,深入剖析Flink SQL的执行流程,探究其内部数据的流转机制以及如何基于Calcite进行功能拓展。

一、SQL到Operation转化:解析与转换的起点

Flink SQL的所有SQL执行入口为TableEnvironment,它提供了DML(数据操作语言)、DDL(数据定义语言)、DQL(数据查询语言)等功能,像executeSql、sqlQuery、registerFunction等方法都是其能力的体现,在日常开发中,executeSql方法的使用频率较高。

以INSERT INTO语句为例,探究Flink SQL到Operation的转换过程。这一过程首先借助Planner提供的基于Calcite的SQL解析器,将SQL字符串转换为Operation。具体而言,通过ParserImpl#parse()方法把字符串转化为ModifyOperation树,并生成新的TableResult对象,其核心步骤如下:

  1. 将SQL解析字符串转为SqlNode:利用Calcite的强大解析能力,对输入的SQL字符串进行词法和语法分析,生成SqlNode。这是整个转换过程的基础,只有准确生成SqlNode,后续操作才能顺利进行 。
  2. 将SqlNode校验:对生成的SqlNode进行严格校验,确保其符合语法和语义规则,为后续转换提供可靠保障。
  3. 通过SqlToOperationConverter#convert()将SqlNode转为ModifyOperation:依据SQL语句的类型和语义,将经过校验的SqlNode转换为对应的Operation,如INSERT INTO语句会被转换为ModifyOperation。

以INSERT INTO执行语句为例,其执行入口位于TableEnvironmentImpl#executeSql(),核心实现是ParserImpl#parse()方法。该方法获取基于Calcite进行语法拓展的SQL解析器,词法、语法解析以及校验工作均由Calcite完成。在使用Calcite解析之前,会优先调用ExtendedParser#parse()方法进行解析,此方法主要用于处理一些CalciteParser不支持的特殊命令,例如SET key=value中键和值标识符包含特殊字符的情况,这样可以避免引入新的保留关键字,提高解析的灵活性和兼容性。ParserImpl#parse部分实现如下:

CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// 解析sql
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {return Collections.singletonList(command.get());
}
SqlNode parsed = parser.parse(statement);

上述代码通过Calcite#parse方法解析SQL后,成功得到SqlNode对象。而对于SqlNode的校验和转换为Operation的操作,则是通过SqlToOperationConverter#convert()方法实现。其内部借助flinkPlanner#validate对SqlNode进行校验,详细实现如下:

def validate(sqlNode: SqlNode): SqlNode = {// 获取当前作业对元数据val validator = getOrCreateSqlValidator()validateInternal(sqlNode, validator)
}
private def validateInternal(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {try {sqlNode.accept(new PreValidateReWriter(validator.getCatalogReader.unwrap(classOf[CatalogReader]), typeFactory))// 进行扩展验证。sqlNode match {case node: ExtendedSqlNode =>node.validate()case _ =>}// 不需要验证DDL的行类型并插入节点。if (sqlNode.getKind.belongsTo(SqlKind.DDL)|| .......) {return sqlNode}sqlNode match {case richExplain: SqlRichExplain =>val validated = validator.validate(richExplain.getStatement)richExplain.setOperand(0, validated)richExplaincase _ =>validator.validate(sqlNode)}}

经过上述校验流程,确保SqlNode合法后,会根据其类型转换为对应的Operation。例如,INSERT INTO语句会被转换为ModifyOperation,具体转换实现如下:

public static Optional<Operation> convert(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {// validate the queryfinal SqlNode validated = flinkPlanner.validate(sqlNode);SqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);if (validated instanceof SqlUseCatalog) {return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));} else if (validated instanceof SqlShowCatalogs) {return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));} else if (validated instanceof SqlShowCurrentCatalog) {return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));}if (validated instanceof SqlCreateDatabase) {return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));} else if (validated instanceof SqlDropDatabase) {return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));} else if (validated instanceof SqlAlterDatabase) {return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));} else if (validated instanceof SqlShowDatabases) {return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));} ..................}

至此,Flink SQL完成了从SQL字符串到Operation的转换过程,为后续的处理奠定了基础。

二、Operation到Transformation:优化与执行计划生成

在完成SQL到Operation的转换后,接下来需要将Operation转换为Transformation。仍以INSERT INTO语句为例,SQL经过转换后会返回Operation的集合,取出集合中的第一个元素传入executeInternal方法中继续执行。该方法会依据Operation的类型进行相应处理,例如CreateTableOperation、DropTableOperation等会触发对catalogManager的相关操作;而对于INSERT INTO语句对应的ModifyOperation类型,则会传入executeInternal的另一个重载方法进行处理。根据Operation类型进行相应处理的部分代码如下:

if (operation instanceof ModifyOperation) {return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {CreateTableOperation createTableOperation = (CreateTableOperation) operation;if (createTableOperation.isTemporary()) {catalogManager.createTemporaryTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());} else {catalogManager.createTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;
}
....................

executeInternal的另一个重载方法是这一阶段的核心,其核心代码如下:

public TableResult executeInternal(List<ModifyOperation> operations) {List<Transformation<?>> transformations = translate(operations);List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);TableResult result = executeInternal(transformations, sinkIdentifierNames);result.await();return result;
}

从上述代码可以看出,从Operation到Transformation转换流程的核心方法是translate方法。该方法借助Planner#translate完成相关功能,Planner的默认实现类为PlannerBase。translate方法主要负责将Operation集合转换为Calcite的物理执行计划RelNode,然后对RelNode进行优化,最终将其转换为Transformation,具体代码如下:

override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {validateAndOverrideConfiguration()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]]}// 借助translateToRel转换Operation为relNodeval relNodes = modifyOperations.map(translateToRel)// 优化物理执行计划val optimizedRelNodes = optimize(relNodes)// 获取优化后的执行计划转化为ExecNodeGraphval execGraph = translateToExecNodeGraph(optimizedRelNodes)// 根据作业类型,使用不同的Planner进行算子转换,如流处理则使用:StreamPlanner、批处理则使用:BatchPlannerval transformations = translateToPlan(execGraph)cleanupInternalConfigurations()transformations
}

Operation转换为RelNode的详细实现在translateToRel方法中,该方法也是获取Flink SQL血缘关系的核心实现。开发者如有相关需求,可以复写此方法,使Flink SQL具备更灵活的血缘解析功能。在RelNode的优化过程中,主要涉及CommonSubGraphBasedOptimizer、BatchCommonSubGraphBasedOptimizer、StreamCommonSubGraphBasedOptimizer这三个类,从名称即可看出,StreamCommonSubGraphBasedOptimizer负责流处理的优化,BatchCommonSubGraphBasedOptimizer则针对批处理进行优化。

RelNode经过优化器优化后,会转换为ExecNodeGraph,最后通过translateToPlan()方法将ExecNodeGraph转换为transformations流水线。至此,Flink SQL完成了从Operation到Transformation的转换,得到的transformations流水线与使用StreamExecutionEnvironment开发的方式类似,后续将依次经过流图/plan、作业图、执行图的转换,最终进入分布式执行阶段。

三、总结:深入理解Flink SQL执行精髓

通过以INSERT INTO语句为切入点,对Flink SQL执行流程的详细剖析,我们深入了解了其从SQL语句输入到分布式执行的全过程。在这个过程中,Calcite框架发挥了关键作用,Flink SQL基于Calcite进行拓展,实现了强大的SQL解析、校验和优化功能。

从SQL到Operation的转换,将用户输入的SQL语句转化为系统能够处理的操作对象;而Operation到Transformation的转换,则进一步将操作对象优化并转换为可执行的计划。这两个阶段紧密配合,确保了Flink SQL能够高效、准确地执行用户的指令。

深入理解Flink SQL的执行流程,不仅有助于开发者更好地使用Flink SQL进行大数据处理,还能在遇到性能问题或需要进行功能拓展时,快速定位问题并找到解决方案。同时,也为我们在其他类似的数据处理场景中,设计和优化执行流程提供了宝贵的经验借鉴。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86403.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86403.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/86403.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是redis

Redis是一个开源的、基于内存的高性能键值存储数据库&#xff0c;广泛用于缓存、消息队列、会话存储等场景。 - 核心特点&#xff1a; - 内存存储&#xff1a;数据存储在内存中&#xff0c;读写速度极快。 - 键值对&#xff1a;以键值对形式存储数据&#xff0c;键通常是字…

《从0到1:C/C++音视频开发自学指南》

开启自学之旅&#xff1a;为何选择 C/C 音视频开发 在当今数字化时代&#xff0c;音视频开发的应用场景极为广泛&#xff0c;深刻融入了我们生活与工作的方方面面。从火爆的直播行业&#xff0c;无论是电商直播中主播与观众的实时互动&#xff0c;还是游戏直播里精彩赛事的实时…

学习日记-spring-day37-6.25

知识点&#xff1a; 1.使用utillist进行配置 知识点 核心内容 重点 Spring框架中utl名称空间创建List 通过utl名称空间创建并管理集合对象&#xff0c;实现数据复用 utl list与普通list赋值的区别; 名称空间引入方法 无参构造器使用规则 当类中没有其他构造器时&#x…

【Python练习】012. 使用字符串的upper()方法将字符串转换为大写

012. 使用字符串的upper方法将字符串转换为大写 012. 使用字符串的upper()方法将字符串转换为大写示例代码运行结果代码解释 扩展&#xff1a;动态输入字符串示例运行 何时使用upper方法基本用法示例忽略大小写的字符串比较数据清洗标准化 注意事项 012. 使用字符串的upper()方…

Python Polars库详解:高性能数据处理的新标杆

在数据驱动的时代&#xff0c;高效的数据处理能力已成为开发者和数据科学家的核心竞争力。作为Pandas的强劲挑战者&#xff0c;Polars库凭借其基于Rust的底层架构和创新的表达式引擎&#xff0c;在性能测试中展现出惊人的速度优势。本文将深入解析Polars的核心特性、使用技巧及…

Go语言- 单元测试

实际开发中&#xff0c;需要保证单元功能正确。 传统方式&#xff1a;在main函数中直接调用&#xff0c;查看结合是否和预期一致。 缺点&#xff1a;1. 不方便 2. 不利于管理 因此&#xff0c;单元测试具有必要性 testing测试框架 Go语言中自带testing轻量级测试框架和go…

Vue移动端开发的适配方案与性能优化技巧

文章目录 1. 移动端适配方案1.1. 视口适配1.2. 基于rem/em的适配方案1.3. vw/vh视口单位适配1.4. 移动端UI组件库适配 2. 移动端性能优化技巧2.1. 虚拟列表实现长列表优化2.2. 图片懒加载与优化2.3. 减少首屏加载时间2.4. 事件节流与防抖 3. 移动端常见问题解决方案3.1. 移动端…

如何微调和部署OpenVLA在机器人平台上

这个教程来自这个英伟达网址 教程的目标是提供用于部署 VLA 模型的优化量化和推理方法&#xff0c;以及针对新机器人、任务和环境的参考微调流程。在一个自包含的仿真环境中&#xff0c;结合场景生成和领域随机化&#xff08;MimicGen&#xff09;对性能和准确性进行严格验证。…

深入剖析Flink内存管理:架构、调优与实战指南

在大数据处理领域&#xff0c;Apache Flink凭借强大的流处理和批处理能力备受青睐。而Flink内存管理机制&#xff0c;作为保障作业高效稳定运行的关键支柱&#xff0c;深刻影响着任务执行性能、资源利用率以及系统容错能力。理解并掌握Flink内存管理原理与优化策略&#xff0c;…

【力扣 C】动态规划专题目录

【力扣 简单 C】509. 斐波那契数https://blog.csdn.net/2503_92320911/article/details/148810148 【力扣 中等 C】983. 最低票价https://blog.csdn.net/2503_92320911/article/details/148833421 【力扣 中等 C】91. 解码方法https://blog.csdn.net/2503_92320911/article/d…

Linux 中如果网络连接丢失或无法找到网络设备

如下步骤 1. 检查网络服务状态 sudo systemctl status NetworkManager 如果服务未运行&#xff0c;启动并启用它&#xff1a; sudo systemctl start NetworkManager sudo systemctl enable NetworkManager ______ 2. 检查网络接口 ip add 确认网卡&#xff08;如 eth0、en…

【Linux 平台总线驱动开发实战】

Linux 平台总线驱动开发实战 一、平台总线驱动基础概念二、核心数据结构解析2.1 设备结构体 struct platform_device2.2 驱动结构体 struct platform_driver2.3 资源结构体 struct resource 三、驱动开发完整流程3.1 设备注册3.2 驱动注册3.3 设备与驱动匹配 四、编译与测试4.1…

LabVIEW液位上升图像识别 附件有源码

源程序在这里https://www.bjcyck.com/nd.jsp?fromColId101&id2675#_np101_331 本LabVIEW 程序实现基于图像灰度特征的液位上升监测与控制&#xff0c;通过读取序列液位上升图像&#xff0c;分析指定区域灰度变化获取液位斜率&#xff0c;依据设定标记位置实现液位上升到目…

git安装使用详细教程

git高速下载 macOS 系统 # 方法1&#xff1a;Homebrew&#xff08;推荐&#xff09; brew install git# 方法2&#xff1a;官方安装包 下载地址&#xff1a;https://sourceforge.net/projects/git-osx-installer/Linux 系统 # Debian/Ubuntu sudo apt update && sudo…

玛哈特机械矫平机:精密制造的“应力消除师”与“平整度雕刻家”

机械矫平机&#xff0c;作为金属板材加工链中的关键一环&#xff0c;其价值远不止于“压平”那么简单。它是材料科学、精密机械与控制技术的结晶&#xff0c;是确保高端制造品质的幕后功臣。本文将深入探讨其核心机理、进阶应用及未来方向。 一、 矫平机理再探&#xff1a;超越…

四色(定理/猜想)染色算法小软件Version1.11 2025.6.24 开发者:孝感动天/卧冰求鲤

四色(定理/猜想)染色算法小软件Version1.11 2025.6.24 开发者&#xff1a;孝感动天/卧冰求鲤 开发者&#xff1a;路人甲/打酱油 开发者&#xff1a;四色定要治理/四邻不安/相邻必反/草木皆兵/围棋紧箍/不是我~干的/和我无关 开发者&#xff1a;不是我/不是我干的&#xff0c…

SQL 分页方法全解析:从基础到高级应用

一、引言 在 Web 应用和数据分析中&#xff0c;分页是处理大量数据的必备功能。想象一下&#xff0c;如果没有分页&#xff0c;社交媒体的动态流、电商平台的商品列表都将变成无穷无尽的长页面&#xff0c;用户体验和系统性能都会受到严重影响。本文将深入探讨 SQL 中各种分页方…

STM32 adc采集数据存到SD卡中

F1板子实现adc采集模拟信号存储到SD卡中 STM32 adc采集数据存到SD卡中/STM32SD文件系统ADC采集/AD/adc_dma.c , 10291 STM32 adc采集数据存到SD卡中/STM32SD文件系统ADC采集/AD/adc_dma.h , 661 STM32 adc采集数据存到SD卡中/STM32SD文件系统ADC采集/CMSIS/core_cm3.c , 17273…

redis8.0新特性:布谷鸟过滤器(Cuckoo Filter)详解

文章目录 一、写在前面二、使用1、CF.RESERVE 创建布谷鸟过滤器2、CF.ADD 添加元素3、CF.ADDNX 不存在才添加4、CF.COUNT 判断元素添加次数5、CF.DEL 删除一次元素6、CF.EXISTS 判断元素是否存在7、CF.MEXISTS 批量判断元素是否存在8、CF.INFO 查看布谷鸟过滤器信息9、CF.INSER…

2025 Java秋招『面试避坑指南』:牛客网高频题分类精讲

前言 今天为大家整理了目前互联网出现率最高的大厂面试题&#xff0c;所谓八股文也就是指文章的八个部分&#xff0c;文体有固定格式:由破题、承题、起讲、入题、起股、中股、后股、束股八部分组成&#xff0c;题目一律出自四书五经中的原文。 初中级和中高级都有&#xff0c…