最近我们组在大规模上线Flink SQL作业。首先,在进行跑批量初始化完历史数据后,剩下的就是消费Kafka历史数据进行追数了。但是发现某些作业的追数过程十分缓慢,要运行一晚上甚至三四天才能追上最新数据。由于是实时数仓指标计算上线初期,经常验证作业如果有问题就得重蹈覆辙重新追数,效率很低,于是我开始分析Flink SQL的优化。

问题


insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a

上面这个作业的简化版SQL,主要就是做一个分组聚合:

  1. 从tableA分组聚合出结果插入tableB
  2. tableA的联合主键是:a,b(但是a的离散度已经很高了)
  3. tableA的Flink表类型为upset-kafka
  4. tableB的Flink表类型为HBase

初步分析


这个作业跑在集群上的job graph如下:

可以看到有三个vertex:

  1. 第一个是TableSourceScan
  2. 第二个是ChangelogNormalize
  3. 第三个是GroupAggregate

TableSourceScan接入tableA表的upsert-kafka流;

ChangelogNormalize对upset-kafka进行撤回语义的解析;

GroupAggregate对撤回流进行分组聚合,然后写入tableB的HBase;

优化思路1:local/global agg


agg分类:

  • group agg
select count(a) from t group by b
  • over agg
select count(a) over (partition by b order by c) from t
  • window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b

local/global agg:

核心思想与hadoop的combiner是一致的,就是在mapreduce的过程中,在map阶段就做一个预聚合,即combine操作。

[图片上传失败…(image-c0ad24-1650075387085)]

带来的收益是:减少网络shuffle数据,提升计算引擎的性能。

前提条件:

  1. agg的所有agg function都是mergeable(实现merge方法)
  2. table.optimizer.agg-phase-strategy为AUTO或TWO_PHASE
  3. Stream下,minibatch开启;Batch下,AUTO会根据cost选择

解释说明:

mergeable其实就是能用分治法解决的计算问题,例如sum、count等,而avg就不能用分治法先计算部分元素的avg,再计算最终avg了,结果有时候会出错。

table.optimizer.agg-phase-strategy:默认为AUTO,意思是引擎尽量做预聚合;TWO_PHASE表示所有聚合操作都做预聚合;ONE_PHASE表示所有聚合都不做预聚合。

minibatch:即开启微批模式。主要有三个参数:

table.exec.mini-batch.enabled:是否开启,默认不开启
table.exec.mini-batch.size:微批的record buffer大小
table.exec.mini-batch.allow-latency:微批的time buffer大小

minibatch的本质就是平衡实时性和吞吐量的刻度尺。

所以,local/global agg一共需要三个参数控制。

验证


经过对比验证,在这个SQL场景下的效率提升很小。

local/global agg降低了第二个vertex即ChangelogNormalize的sent records的数据量,而并没有使得第一个vertex的数据处理效率有显著提升。

所以,这个作业的瓶颈并不在vertex间, 而在于第一个vertex的处理数据效率。

优化思路二:调大并行度


这个思路的关键在于source upsert-kafka的分区数,这是制约吞吐量的瓶颈。因为在upsert-kafka中,每个partition最多被一个Flink线程读取。

增加了10倍的并行度,source分区也增加10倍后,作业周转时间缩短了将近一半。

优化思路三:RocksDB性能调优


仔细分析这个SQL作业,是对一个联合主键的字段做group by,那么state一定会非常大。

经过在对这个表在数仓中的数据进行分析,发现这个字段的离散度几乎接近于主键的离散度。

而进行group by必然要根据每一条upsert kafka的数据去查验在flink statebackend中物化的source table中该字段值的分布情况,这应该是才是瓶颈所在!

沿着这个思路,开始分析Flink的statebackend机制。

这里我们简单回顾一下Flink statebackend(后面再做专题总结):

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

我们的线上一般采用的是RocksDB作为状态后端,checkpoint dir采用hdfs文件系统。其实我个人觉得这个应该根据作业的特性进行选择,根据我个人的经验以及知识沉淀,选择的主要因素是作业的state大小及对处理数据性能的要求:

  • RocksDBStateBackend可以突破内存的限制,rocksDB的数据逻辑结构和redis相似,但是数据的物理存储结构又和hbase相似,继承自levelDB的LSM树思想,缺点是性能太低
  • 而FsStateBackend是在做snapshot的时候才将内存的state持久化到远端,速度接近于内存状态
  • MemoryStateBackend是纯内存的,一般只用做调试。

但是由于这个大状态作业追数速度实在太慢,我甚至想过:

在追数的时候用FsStateBackend,并配置大内存,且把managed memory调成0,同时将ck的周期设置的很大,基本上不做ck,追上后savepoint。再把状态后端换成RocksDB,并且从FSSatebackend的savepoint处恢复,但是发现1.13才支持savepoint切换statebackend类型。

只剩下调优RocksDB一条路了。根据之前对HBase的LSM原理的理解,进行知识迁移,马上对RocksDB有了一定的认识。在HBase中调优效果最明显无乎:

blockcache读缓存、memStore写缓存、增加布隆过滤器、提升compact效率

沿着这个思路,再查阅了一番RocksDB资料后,决定先对如下参数进行调优:

  • state.backend.rocksdb.block.cache-size
state.backend.rocksdb.block.blocksize

Block 块是 RocksDB 保存在磁盘中的 SST 文件的基本单位,它包含了一系列列有序的 Key 和 Value 集合,可以设置固定的大小。

但是,通过增加 Block Size,会显著增加读放大(Read Amplification)效应,令读取数据时,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小没有变,就会⼤大减少 Cache 中可存放的 Block 数。如果 Cache 中还存处理索引和过滤器等内容,那么可放置的数据块数目就会更少,可能需要更多的磁盘 IO 操作,找到数据就更更慢了,此时读取性能会大幅下降。反之,如果减小BlockSize,会让读的性能有不少提升,但是写性能会下降,⽽而且对 SSD 寿命也不利。

因此我的调优经验是,如果需要增加 Block Size 的大小来提升读写性能,请务必一并增加 Block Cache Size 的大小,这样才可以取得比较好的读写性能。Block Cache,缓存清除算法⽤用的是 LRU(Least Recently Used)。

验证


测试对比后发现,原本半天左右完成的作业只需要一到两个小时即可追上数据!

感悟


性能调优就如同把脉治病,关键在于对症下药。

前期,要分析当前场景下真正制约性能的瓶颈所在,后期,在症结处用效果最明显的方式处理症结。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/88988.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/88988.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/88988.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML 树结构(DOM)深入讲解教程

一、HTML 树结构的核心概念 1.1 DOM(文档对象模型)的定义 DOM(Document Object Model)是 W3C 制定的标准接口,允许程序或脚本(如 JavaScript)动态访问和更新 HTML/XML 文档的内容、结构和样式。…

用鼠标点击终端窗口的时候出现:0;61;50M0;61;50M0;62;50M0

在做aws webrtc viewer拉流压测的过程中,我本地打开了多个终端,用于连接EC2实例: 一个终端用于启动 ‘并发master脚本’、监控master端的cpu、mem;一个终端用于监控master端的带宽情况;一个终端用于监控viewer端的cpu、…

C++-linux 5.gdb调试工具

GDB调试工具 在C/C开发中,程序运行时的错误往往比编译错误更难定位。GDB(GNU Debugger)是Linux环境下最强大的程序调试工具,能够帮助开发者追踪程序执行流程、查看变量状态、定位内存错误等。本章将从基础到进阶,全面讲…

Update~Read PLC for Chart ~ Log By Shift To be... Alarm AI Machine Learning

上图~ 持续迭代 1、增加报警弹窗,具体到哪个值,双边规格具体是多少 2、实时显示当前值的统计特征,Max Min AVG ... import tkinter as tk from tkinter import simpledialog import time import threading import queue import logging from datetime import datet…

es的自定义词典和停用词

在 Elasticsearch 中,自定义词典是优化分词效果的核心手段,尤其适用于中文或专业领域的文本处理。以下是关于 ES 自定义词典的完整指南: 为什么需要自定义词典? 默认分词不足: ES 自带的分词器(如 Standard…

微算法科技技术突破:用于前馈神经网络的量子算法技术助力神经网络变革

随着量子计算和机器学习的迅猛发展,企业界正逐步迈向融合这两大领域的新时代。在这一背景下,微算法科技(NASDAQ:MLGO)成功研发出一套用于前馈神经网络的量子算法,突破了传统神经网络在训练和评估中的性能瓶颈。这一创新…

一文读懂循环神经网络(RNN)—语言模型+读取长序列数据(2)

目录 读取长序列数据 为什么需要 “读取长序列数据”? 读取长序列数据的核心方法 1. 滑动窗口(Sliding Window) 2. 分段截取(Segmentation) 3. 滚动生成(Rolling Generation) 4. 关键信息…

Oracle Virtualbox 虚拟机配置静态IP

Oracle Virtualbox 虚拟机配置静态IP VirtualBox的网卡,默认都是第一个不能自定义,后续新建的可以自定义。 新建NAT网卡、host主机模式网卡 依次点击:管理->工具->网络管理器新建host主机模式网卡 这个网卡的网段自定义,创建…

Linux RAID1 创建与配置实战指南(mdadm)

Linux RAID1 创建与配置实战指南(mdadm)一、RAID1 核心价值与实战目标RAID1(磁盘镜像) 通过数据冗余提供高可靠性:当单块硬盘损坏时,数据不丢失支持快速阵列重建读写性能略低于单盘(镜像写入开销…

MySQL数据库----函数

目录函数1,字符串函数2,数值函数3,日期函数4,流程函数函数 1,字符串函数 MySQL中内置了很多字符串函数 2,数值函数 3,日期函数 4,流程函数

1.2 vue2(组合式API)的语法结构以及外部暴露

vue2 vue3中可以写vue2的语法&#xff0c;vue2的结构像一个花盆里的根&#xff08;根组件App.vue&#xff09;&#xff0c;根上可以插上不同的枝杈和花朵&#xff08;组件&#xff09;。 组件的结构&#xff1a; // 这里写逻辑行为 <script lang"ts"> export d…

Swift 解 LeetCode 324:一步步实现摆动排序 II,掌握数组重排的节奏感

文章目录摘要描述题解答案题解代码&#xff08;Swift&#xff09;题解代码分析步骤一&#xff1a;排序数组步骤二&#xff1a;左右指针分段步骤三&#xff1a;按位置交错插入示例测试及结果示例 1示例 2示例 3&#xff08;边界情况&#xff09;时间复杂度分析空间复杂度分析总结…

使用SQLMAP的文章管理系统CMS的sql注入渗透测试

SQLMAP注入演示&#xff1a;抓包拿到Cookie:召唤sqlmap&#xff1a;sqlmap -u "http://192.168.1.99:8085/show.php?id34" --cookie "pma_langzh_CN; kbqug_admin_username2621-PL_LxhFjyVe43ZuQvht6MI5q0ZcpRVV5FI0pzQ6XR8; kbqug_siteid2621-PL_LxhFjyVe4yA5…

I3C通信协议核心详解

一、物理层与电气特性双线结构 SCL&#xff08;串行时钟线&#xff09;&#xff1a;主设备控制&#xff0c;支持 推挽&#xff08;Push-Pull&#xff09;输出&#xff08;高速模式&#xff09;和 开漏&#xff08;Open-Drain&#xff09;&#xff08;兼容I2C模式&#xff09;。…

Docker搭建Redis哨兵集群

Redis提供了哨兵机制实现主从集群下的故障转移&#xff0c;其中包含了对主从服务的检测、自动故障恢复和通知。 1.环境 centos7、redis6.2.4、MobaXterm 目的&#xff1a; 搭建redis的主从同步哨兵集群&#xff08;一主一从三哨兵&#xff09; 2.步骤 1.主从集群的搭建 主从…

暑假Python基础整理 --异常处理及程序调试

异常概念 在程序运行过程中&#xff0c;经常会遇到各种各样的错误&#xff0c;这些错误统称为“异常”。如下表是Python常见的异常与描述&#xff1a; 异常描述NameError尝试访问一个未声明的变量引发错误IndexError索引超出序列范围引发错误IndentationError缩进错误ValueErr…

k8s-高级调度(二)

目录 Taint(污点)与Toleration(容忍) Taint&#xff08;污点&#xff09;&#xff1a;节点的排斥标记 Toleration&#xff08;容忍&#xff09;&#xff1a;Pod的适配声明 与节点亲和性的对比 警戒(cordon)和转移(drain) Cordon&#xff1a;节点隔离&#xff08;阻止新 Po…

基于OpenCV的深度学习人脸识别系统开发全攻略(DNN+FaceNet核心技术选型)

核心技术选型表 技术组件版本/型号用途OpenCV DNN4.5.5人脸检测FaceNet (facenet-pytorch)0.5.0人脸特征提取MiniConda最新版Python环境管理PyTorch1.8.0FaceNet运行基础OpenVINO2021.4模型加速(可选)SSD Caffe模型res10_300x300高精度人脸检测 一、环境准备与项目搭建 1.1 M…

【AI News | 20250714】每日AI进展

AI Repos 1、All-Model-Chat All Model Chat 是一款为Google Gemini API家族设计的网页聊天应用&#xff0c;支持多模态输入&#xff08;图片、音频、PDF等&#xff09;和多种模型&#xff08;如Gemini Flash、Imagen&#xff09;。它提供了丰富的自定义功能&#xff0c;包括高…

C 语言(二)

主要包括变量与常量、数据类型、存储方式、数制转换以及字符处理等内容一、变量与常量在 C 语言中&#xff0c;变量是用来存储数据的命名空间&#xff0c;它会在内存中分配地址。例如&#xff1a;int i; i 12345; 其中 i 是变量&#xff0c;12345 是常量。常量表示在程序运行过…