一、引言

在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。

二、整体架构概述

Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat 及其子类展开。AbstractClickHouseInputFormat 是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormatClickHouseShardInputFormat 实现,它们分别适用于不同的场景,以满足多样化的读取需求。

三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>implements ResultTypeQueryable<RowData> {protected final String[] fieldNames;protected final TypeInformation<RowData> rowDataTypeInfo;protected final Object[][] parameterValues;protected final String parameterClause;protected final String filterClause;protected final long limit;protected AbstractClickHouseInputFormat(String[] fieldNames,TypeInformation<RowData> rowDataTypeInfo,Object[][] parameterValues,String parameterClause,String filterClause,long limit) {this.fieldNames = fieldNames;this.rowDataTypeInfo = rowDataTypeInfo;this.parameterValues = parameterValues;this.parameterClause = parameterClause;this.filterClause = filterClause;this.limit = limit;}

AbstractClickHouseInputFormat 继承自 RichInputFormat 并实现了 ResultTypeQueryable 接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat 实例时,所有必要的信息都已正确设置。

2. AbstractClickHouseInputFormat.Builder

AbstractClickHouseInputFormat.Builder 类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输入格式实例。

public Builder withOptions(ClickHouseReadOptions readOptions) {this.readOptions = readOptions;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {Preconditions.checkNotNull(readOptions);Preconditions.checkNotNull(connectionProperties);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(rowDataTypeInfo);ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(readOptions, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),readOptions.getDatabaseName(),readOptions.getTableName());boolean isDistributed = engineFullSchema != null;if (isDistributed && readOptions.isUseLocal()) {initShardInfo(connectionProvider, engineFullSchema);initPartitionInfo();} else if (readOptions.getPartitionColumn() != null) {initPartitionInfo();}LogicalType[] logicalTypes =Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);return isDistributed && readOptions.isUseLocal()? createShardInputFormat(logicalTypes, engineFullSchema): createBatchInputFormat(logicalTypes);} catch (Exception e) {throw new RuntimeException("Build ClickHouse input format failed.", e);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat

ClickHouseBatchInputFormat 用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat 用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat 会根据分片信息从各个分片上并行读取数据,从而提高读取效率。

private AbstractClickHouseInputFormat createShardInputFormat(LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {return new ClickHouseShardInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,engineFullSchema,shardMap,shardValues,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {return new ClickHouseBatchInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}

这两个方法分别用于创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter 用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData 格式。

4. FilterPushDownHelper
public class FilterPushDownHelper {private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();static {FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);FILTERS.put(BuiltInFunctionDefinitions.AND, AND);FILTERS.put(BuiltInFunctionDefinitions.OR, OR);}public static String convert(List<ResolvedExpression> filters) {int filterSize = filters.size();return filters.stream().map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize)).filter(Optional::isPresent).map(Optional::get).collect(joining(" AND "));}

FilterPushDownHelper 类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert 方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。

四、读取流程总结
  1. 配置参数:使用 AbstractClickHouseInputFormat.BuilderwithXXX 方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。
  2. 构建输入格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchInputFormatClickHouseShardInputFormat 实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。
  3. 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用 FilterPushDownHelper 进行过滤条件的下推,减少不必要的数据传输,提高读取效率。
  4. 资源管理:在读取完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理使用过滤条件下推:尽量使用 FilterPushDownHelper 提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。
  2. 并行读取数据:对于分布式表,可以使用 ClickHouseShardInputFormat 进行分片读取,并行从各个分片上读取数据,提高读取效率。
  3. 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论

通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/87981.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/87981.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/87981.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云原生技术与应用-容器技术技术入门与Docker环境部署

目录 一.Docker概述 1.什么是Docker 2.Docker的优势 3.Docker的应用场景 4.Docker核心概念 二.Docker安装 1.本安装方式使用阿里的软件仓库 2.Docker镜像操作 3.Docker容器操作 一.Docker概述 因为 Docker 轻便、快速的特性&#xff0c;可以使应用达到快速迭代的目的。每次小…

第2章,[标签 Win32] :匈牙利标记法

专栏导航 上一篇&#xff1a;第2章&#xff0c;[标签 Win32] &#xff1a;Windows 数据类型 回到目录 下一篇&#xff1a;第2章&#xff0c;[标签 Win32] &#xff1a;兼容 ASCII 字符与宽字符的 Windows 函数调用 本节前言 在初学编程的时候&#xff0c;我们给变量命令的…

从深度学习的角度看自动驾驶

从深度学习的角度看自动驾驶 A Survey of Autonomous Driving from a Deep Learning Perspective 我们探讨了深度学习在自主驾驶中的关键模块&#xff0c;例如感知&#xff0c;预测&#xff0c;规划以及控制。我们研究了自主系统的体系结构&#xff0c;分析了如何从模块化&…

java+vue+SpringBoo基于Hadoop的物品租赁系统(程序+数据库+报告+部署教程+答辩指导)

源代码数据库LW文档&#xff08;1万字以上&#xff09;开题报告答辩稿ppt部署教程代码讲解代码时间修改工具 技术实现 开发语言&#xff1a;后端&#xff1a;Java 前端&#xff1a;vue框架&#xff1a;springboot数据库&#xff1a;mysql 开发工具 JDK版本&#xff1a;JDK1.8 数…

【文献笔记】Automatic Chain of Thought Prompting in Large Language Models

Automatic Chain of Thought Prompting in Large Language Models 原文代码&#xff1a;https://github.com/amazon-research/auto-cot 标题翻译&#xff1a;大规模语言模型中的自动思维链提示 1. 内容介绍 在提示词中提供思考步骤被称为思维链&#xff08;CoT&#xff09;&…

【Behavior Tree】-- 行为树AI逻辑实现- Unity 游戏引擎实现

行为树简易敌人AI 前言&#xff1a; 有些天没更新新文章了&#xff0c;主要是最近科一有些头疼&#xff0c;而且最近琢磨这个行为树代码有些难受&#xff0c;但是终于熬出头了&#xff0c;MonoGame的系列会继续更新的&#xff0c;今天不说别的就说困扰我两三天的行为树 有限状态…

百度大模型开源,俩条命令、本地启动

百度大模型开源 本地启动手册 安装依赖&#xff1a; python -m pip install paddlepaddle-gpu3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/cu126/python -m pip install fastdeploy-gpu -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-gpu-80_…

rabbitMQ读取不到ThreadLocal消息的bug

rabbitMQ读取不到ThreadLocal消息的bug 当使用消息队列时&#xff0c;监听队列不会运行到主线程上&#xff0c;线程消息之间是不会共享的&#xff0c;故属于主线程的ThreadLocal就读取不到数据的值 主线程名字&#xff1a;main使用消息队列的线程名字&#xff1a;ntContainer#2…

IDEA Maven报错 无法解析 com.taobao:parent:pom:1.0.1【100%解决 此类型问题】

IDEA Maven报错 无法解析com.taobao:parent:pom:1.0.1【100%解决 此类型问题】 报错日志 PS D:\Learn_Materials\IDEA_WorkSpace\Demo\spring_test_demo> mvn clean install -U [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered whi…

函数-1-字符串函数

函数-1-字符串函数字符串函数函数语法字符串函数的使用字符串函数语法案例演示实战练习字符串函数 函数 函数是一段可以直接被另一端程序调用的程序或代码 语法 SELECT 函数名(参数名)大家可能会有那么一点点疑惑, 为什么执行函数还需要加上SELECT语句? 总结一下, 因为SEL…

打破AI落地困局:易路iBuilder的“垂直深耕+开箱即用”破壁之道

中国企业的数字化转型已步入深水区&#xff0c;人力资源管理作为企业核心竞争力的关键引擎&#xff0c;正经历从“信息化”向“智能化”的范式跃迁。在这场以AI为驱动的组织效能革命中&#xff0c;​​易路人力资源科技​​凭借前瞻性的“软件AI服务”战略&#xff0c;推出国内…

Higress离线部署

1.前提条件检查docker和docker compose是否已经具备 [roothost151 ~]# docker -v Docker version 26.1.4, build 5650f9b [roothost151 ~]# docker composeUsage: docker compose [OPTIONS] COMMANDDefine and run multi-container applications with DockerOptions:--all-res…

利用AI技术快速提升图片编辑效率的方法

通过更换背景或进行其他创意编辑&#xff0c;可以为图片赋予新的生命力和视觉效果&#xff0c;使得创意表达更加自由灵活。这款AI抠图工具堪称强大&#xff0c;依托先进的阿尔法通道技术&#xff0c;能够精准、自然地实现图像抠取与背景更换。操作也非常简单&#xff0c;只需将…

Wend看源码-RAGFlow(上)

前言 最近在github上搜罗Rag相关项目的时候&#xff0c;我根据star 搜索到了目前star 最高的一些RAG 项目 &#xff0c;其中稳居榜首的就是RAGFlow。 RAG stars:>1000 language:Python pushed:>2025-01-01 github RAG 相关项目搜索结果 为了系统性的学习RAG 技术栈&#…

LangChain实现RAG检索增强

1:启动vllm的openai兼容server&#xff1a; export VLLM_USE_MODELSCOPETrue python -m vllm.entrypoints.openai.api_server --model qwen/Qwen-7B-Chat-Int4 --trust-remote-code -q gptq --dtype float16 --gpu-memory-utilization 0.6 2:构建向量数据库 from langchain_…

Redis基础(6):SpringDataRedis

SpringDataRedis简介 SpringData是Spring中专门进行数据操作的模块&#xff0c;包含了对于各种数据库的集成。其中对Redis的集成模块叫做SpringDataRedis&#xff08;官网地址&#xff1a;Spring Data Redis&#xff09;。其最核心的特点就是提供了不同Redis客户端的整合&…

B. Shrinking Array/缩小数组

B. Shrinking Array让我们称一个数组 b 为 i 美丽 &#xff0c;如果它至少包含两个元素&#xff0c;并且存在一个位置 |bi−bi1|≤1 使得 |x| (其中 x 是 #10# #11# 的绝对值)。给定一个数组 a &#xff0c;只要它至少包含两个元素&#xff0c;你就可以执行以下操作&#xff1a…

【学习笔记】Linux系统中SSH服务安全配置

一、背景知识 以ubuntu为例&#xff0c;查看ssh服务是否安全并配置&#xff0c;执行 ssh -V ssh的配置文件路径&#xff1a;/etc/ssh/sshd_config 二、SSH服务配置文件 1.端口和监听设置 Port 22 含义&#xff1a;指定SSH服务监听的端口号&#xff08;默认是22&#xff09…

FastAPI + Tortoise-ORM + Aerich 实现数据库迁移管理(MySQL 实践)

在 FastAPI 项目中&#xff0c;Tortoise-ORM 是一个轻量的异步 ORM 框架&#xff0c;适用于 async/await 场景。结合数据库迁移工具 Aerich&#xff0c;可以优雅地管理数据库表结构演进&#xff0c;本文将通过完整流程演示如何在 MySQL 环境下使用。&#x1f4e6; 一、环境准备…

7.7日 实验03-Spark批处理开发(2)

使用Spark处理数据文件检查数据检查$DATA_EXERCISE/activations里的数据&#xff0c;每个XML文件包含了客户在指定月份活跃的设备数据。拷贝数据到HDFS的/dw目录样本数据示例&#xff1a;<activations><activation timestamp"1225499258" type"phone&q…