一、引言

在大数据处理的实际应用场景中,数据的高效存储与处理至关重要。Flink 作为一款强大的流式计算框架,能够对海量数据进行实时处理;而 ClickHouse 作为高性能的列式数据库,擅长处理大规模数据分析任务。Flink ClickHouse 连接器则将二者的优势结合起来,允许用户将 Flink 处理后的数据高效地写入 ClickHouse 数据库。下面我们将深入剖析其数据写入的源码实现,探究其背后的工作原理和设计思路。

二、整体架构概述

Flink ClickHouse 连接器的数据写入主要围绕 AbstractClickHouseOutputFormat 及其子类展开。AbstractClickHouseOutputFormat 作为抽象基类,定义了写入数据的基本流程和核心方法,为后续的具体实现提供了统一的框架。具体的写入逻辑由其子类 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 实现,它们分别适用于不同的场景,以满足多样化的需求。

三、核心类及方法详细解析
1. ClickHouseConnectionOptions
// For testing.
@VisibleForTestingpublic ClickHouseConnectionOptions(String url) {this(url, null, null, null, null);}

这个构造函数是专门为测试目的而设计的。在测试环境中,为了简化测试用例的编写,我们可能只需要关注 URL 参数,而不需要设置其他复杂的配置。通过这个构造函数,我们可以方便地创建一个仅包含 URL 的 ClickHouseConnectionOptions 对象,从而更专注于对特定功能的测试。

2. AbstractClickHouseOutputFormat.Builder

AbstractClickHouseOutputFormat.Builder 类采用了建造者模式,用于构建 AbstractClickHouseOutputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输出格式实例。这种设计模式使得代码更加简洁易读,同时也提高了代码的可维护性。

public Builder withOptions(ClickHouseDmlOptions options) {this.options = options;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseOutputFormat.Builder builder = new AbstractClickHouseOutputFormat.Builder();
builder.withOptions(options).withConnectionProperties(connectionProperties);
public AbstractClickHouseOutputFormat build() {Preconditions.checkNotNull(options);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(primaryKeys);Preconditions.checkNotNull(partitionKeys);if (primaryKeys.length > 0) {LOG.warn("If primary key is specified, connector will be in UPSERT mode.");LOG.warn("The data will be updated / deleted by the primary key, you will have significant performance loss.");}ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(options, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),options.getDatabaseName(),options.getTableName());boolean isDistributed = engineFullSchema != null;return isDistributed && options.isUseLocal()? createShardOutputFormat(connectionProvider.getOrCreateConnection(), engineFullSchema): createBatchOutputFormat();} catch (Exception exception) {throw new RuntimeException("Build ClickHouse output format failed.", exception);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。如果指定了主键,会发出警告,因为使用主键会使连接器进入 UPSERT 模式,这可能会导致性能下降。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardOutputFormatClickHouseBatchOutputFormat 实例。最后,无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchOutputFormat 和 ClickHouseShardOutputFormat

ClickHouseBatchOutputFormat 用于批量写入数据,它将多条记录打包成一个批次,一次性发送到 ClickHouse 数据库,从而减少了与数据库的交互次数,提高了写入性能。而 ClickHouseShardOutputFormat 用于分片写入数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardOutputFormat 会根据分片策略将数据正确地分发到相应的分片上。

private ClickHouseBatchOutputFormat createBatchOutputFormat() {return new ClickHouseBatchOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),fieldNames,primaryKeys,partitionKeys,logicalTypes,options);
}private ClickHouseShardOutputFormat createShardOutputFormat(ClickHouseConnection connection, DistributedEngineFull engineFullSchema)throws SQLException {SinkShardingStrategy shardingStrategy;List<FieldGetter> fieldGetters = null;if (options.isShardingUseTableDef()) {Expression shardingKey = engineFullSchema.getShardingKey();if (shardingKey instanceof FieldExpr) {shardingStrategy = SinkShardingStrategy.VALUE;FieldGetter fieldGetter =getFieldGetterOfShardingKey(((FieldExpr) shardingKey).getColumnName());fieldGetters = singletonList(fieldGetter);} else if (shardingKey instanceof FunctionExpr&& "rand()".equals(shardingKey.explain())) {shardingStrategy = SinkShardingStrategy.SHUFFLE;fieldGetters = emptyList();} else if (shardingKey instanceof FunctionExpr&& "javaHash".equals(((FunctionExpr) shardingKey).getFunctionName())&& ((FunctionExpr) shardingKey).getArguments().stream().allMatch(expression -> expression instanceof FieldExpr)) {shardingStrategy = SinkShardingStrategy.HASH;fieldGetters = parseFieldGetters((FunctionExpr) shardingKey);} else {throw new RuntimeException("Unsupported sharding key: " + shardingKey.explain());}} else {shardingStrategy = options.getShardingStrategy();if (shardingStrategy.shardingKeyNeeded) {fieldGetters =options.getShardingKey().stream().map(this::getFieldGetterOfShardingKey).collect(toList());}}ClusterSpec clusterSpec = getClusterSpec(connection, engineFullSchema.getCluster());return new ClickHouseShardOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),clusterSpec,engineFullSchema,fieldNames,primaryKeys,partitionKeys,logicalTypes,shardingStrategy.provider.apply(fieldGetters),options);
}

createShardOutputFormat 方法中,会根据配置选择不同的分片策略,如 VALUESHUFFLEHASH。对于不同的分片策略,会解析相应的分片键,并创建 FieldGetter 列表。例如,如果分片策略为 VALUE,会根据分片键的字段名创建一个 FieldGetter;如果为 SHUFFLE,则不需要 FieldGetter;如果为 HASH,会解析函数表达式中的字段名并创建相应的 FieldGetter 列表。最后,会获取集群信息,并创建 ClickHouseShardOutputFormat 实例。

四、写入流程总结
  1. 配置参数:使用 AbstractClickHouseOutputFormat.BuilderwithXXX 方法设置写入选项、连接属性、字段信息等参数。这些参数将决定数据写入的行为和方式。
  2. 构建输出格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 实例。这个过程中会进行参数检查、连接创建和分片策略解析等操作。
  3. 数据写入:通过创建的输出格式实例,将数据批量或分片写入 ClickHouse 数据库。在写入过程中,会根据配置的批量大小和刷新间隔进行数据的缓存和批量提交,以提高写入性能。
  4. 资源管理:在写入完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理配置批量大小和刷新间隔:根据实际的业务场景和硬件资源,合理调整 sink.batch-sizesink.flush-interval 参数,以平衡写入性能和内存使用。
  2. 避免使用主键进行 UPSERT 操作:如果不是必要情况,尽量避免指定主键,因为 UPSERT 操作会带来较大的性能开销。
  3. 选择合适的分片策略:根据数据的特点和分布情况,选择合适的分片策略,如 VALUESHUFFLEHASH,以确保数据均匀分布在各个分片上。
六、结论

通过对 Flink ClickHouse 连接器数据写入源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据写入的整体流程。这有助于我们在实际应用中更好地配置和优化数据写入过程,提高写入性能和可靠性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/90216.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/90216.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/90216.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV 人脸分析------面部关键点检测类cv::face::FacemarkLBF

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 使用 Local Binary Features (LBF) 算法进行面部关键点检测&#xff08;facial landmark detection&#xff09;。该算法通过级联回归树预测人脸的…

Netstat高级分析工具:Windows与Linux双系统兼容的精准筛查利器

Netstat高级分析工具&#xff1a;Windows与Linux双系统兼容的精准筛查利器在网络安全运维中&#xff0c;快速识别可疑连接是防御入侵的关键一步。本文将介绍一款我本人开发的原创高效的双系统兼容Netstat信息分析工具&#xff0c;大幅提升恶意连接筛查效率。一、Netstat分析在安…

Bright Data MCP+Trae :快速构建电商导购助手垂直智能体

声明&#xff1a;本测试报告系作者基于个人兴趣及使用场景开展的非专业测评&#xff0c;测试过程中所涉及的方法、数据及结论均为个人观点&#xff0c;不代表任何官方立场或行业标准。 文章目录 一、引言1.1 当前AI智能体的趋势1.2 构建智能体面临的最大挑战&#xff1a;数据来…

plantuml用法总结

时序图 参考 https://blog.csdn.net/vitaviva/article/details/120735745用PlantUML简化复杂时序图的秘诀 startuml skin rose actor User as user participant "Component A" as A participant "Component B" as Buser -> A: Request data activate …

基于自研心电芯片国产化手持单导/6导/12导心电解决方案

苏州唯理作为国内心电芯片国产化厂商&#xff0c;面向家用场景&#xff0c;推出了国产化的手持单导/6导/12导心电仪技术解决方案&#xff0c;可以让家用心电图仪成本可控&#xff0c;信号链路质量更佳稳定。该方案已在多家客户中实现批量出货。唯理科技同样提供了医疗级的心电图…

Sass详解:功能特性、常用方法与最佳实践

Sass详解&#xff1a;功能特性、常用方法与最佳实践 Sass&#xff08;Syntactically Awesome Style Sheets&#xff09;作为CSS预处理器领域的先驱&#xff0c;自2006年由Hampton Catlin创建以来&#xff0c;已成为现代前端开发中不可或缺的工具。它通过引入变量、嵌套、混合宏…

vulnhub靶机渗透:PWNLAB: INIT

一、信息收集1、主机发现2、端口扫描PORT STATE SERVICE VERSION 80/tcp open http Apache httpd 2.4.10 ((Debian))111/tcp open rpcbind 2-4 (RPC #100000)3306/tcp open mysql MySQL 5.5.47-0deb8u151649/tcp open status 1 (RPC #100024)3、目录扫描&…

LiveKit 本地部署全流程指南(含 HTTPS/WSS)

1. 环境准备 操作系统&#xff1a;Windows 10/11 或 Linux/Mac需有本地公网/内网 IP&#xff08;如 192.168.x.x&#xff09;推荐浏览器&#xff1a;Chrome/Edge/Firefox/Safari端口未被占用&#xff0c;防火墙允许相关端口 2. 目录结构建议 livekit/livekit-server.execonf…

NumPy-统计函数详解

NumPy-统计函数详解一、基础统计函数&#xff1a;均值、方差、标准差1. 全局统计&#xff1a;忽略维度的整体计算2. 按轴统计&#xff1a;指定维度方向的计算二、位置统计&#xff1a;中位数、分位数、百分位数1. 中位数计算2. 分位数与百分位数三、离散程度&#xff1a;极差、…

音频被动降噪技术

音频被动降噪技术 音频被动降噪技术是一种通过物理结构和材料设计来减少或隔离外部噪声的降噪方式,其核心原理是通过物理屏障或吸声材料来阻断或吸收声波,从而降低环境噪声对听觉体验的影响。以下将从技术原理、应用场景、优缺点及与其他降噪技术的对比等方面进行详细分析。…

中国蚁剑使用方法

找到mysql配置文件 secure-file-priv工作目录 D:\tool\huli\gui_webshell\AntSword\AntSword\antSword-master重点是tool目录后面 前面大家可能都不一样 添加数据一句话木马 3C3F706870206576616C28245F504F53545B22636D64225D293B3F3E 翻译过来 <?php eval($_POST["c…

8.1 prefix Tunning与Prompt Tunning模型微调方法

1 prefix Tunning 链接&#xff1a;https://blog.csdn.net/m0_66890670/article/details/142942034 这里有基础的细节介绍。我下面直接总结。 连接2 &#xff1a;https://zhuanlan.zhihu.com/p/1899112824342577371&#xff0c;简单明了 prefix Tunning改变了什么呢&#xff…

FlashAttention 深入浅出

一 标准Attention的计算 1.1 标准Attention机制详解 标准Attention&#xff08;注意力&#xff09;机制是深度学习&#xff0c;尤其是在自然语言处理领域中一项革命性的技术&#xff0c;它允许模型在处理序列数据时&#xff0c;动态地将焦点放在输入序列的不同部分&#xff0c;…

C/C++ inline-hook(x86)高级函数内联钩子

&#x1f9f5; C/C inline-hook&#xff08;x86&#xff09;高级函数内联钩子 引用&#xff1a; fetch-x86-64-asm-il-sizeC i386/AMD64平台汇编指令对齐长度获取实现 &#x1f9e0; 一、Inline Hook技术体系架构 Inline Hook是一种二进制指令劫持技术&#xff0c;通过修改目…

云服务器的安全防护指南:从基础安全设置到高级威胁防御

随着云计算的广泛应用&#xff0c;云服务器已成为企业和个人存储数据、运行应用的重要基础设施。然而&#xff0c;随之而来的安全威胁也日益增多——从常见的网络攻击&#xff08;如 DDoS、SQL 注入&#xff09;到复杂的恶意软件和零日漏洞&#xff0c;无一不考验着系统的安全性…

状态机管家:MeScroll 的交互秩序维护

一、核心架构设计与性能基石 MeScroll作为高性能滚动解决方案&#xff0c;其架构设计遵循"分层解耦、精准控制、多端适配"的原则&#xff0c;通过四大核心模块实现流畅的滚动体验&#xff1a; 事件控制层&#xff1a;精准捕获触摸行为&#xff0c;区分滚动方向与距…

数据出海的隐形冰山:企业如何避开跨境传输的“合规漩涡”?

首席数据官高鹏律师数字经济团队创作&#xff0c;AI辅助凌晨三点的写字楼&#xff0c;某跨境电商的技术总监盯着屏幕上的报错提示&#xff0c;指尖悬在键盘上迟迟没落下。刚从新加坡服务器调取的用户行为数据&#xff0c;在传输到国内分析系统时被拦截了——系统提示“不符合跨…

【Rust base64库】Rust bas64编码解码详细解析与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Rust开发…

如何利用AI大模型对已有创意进行评估,打造杀手级的广告创意

摘要 广告创意是影响广告效果的最重要的因素之一&#xff0c;但是如何评估和优化广告创意&#xff0c;一直是一个难题。传统的方法&#xff0c;如人工评审、A/B测试、点击率等&#xff0c;都有各自的局限性和缺陷。本文将介绍一种新的方法&#xff0c;即利用人工智能大模型&am…

OSCP - HTB - Cicada

主要知识点 SMB 用户爆破Backup Operator 组提权 具体步骤 nmap扫描一下先&#xff0c;就像典型的windows 靶机一样&#xff0c;开放了N多个端口 Nmap scan report for 10.10.11.35 Host is up (0.19s latency). Not shown: 65522 filtered tcp ports (no-response) PORT …