粗粮厂的基于spark的通用olap之间的同步工具项目

  • 1 项目背景
  • 2 项目实现
    • 2.1 实现原理
    • 2.2 细节要点
  • 3 抽样说明
  • 4 项目运行状态
    • 4.1 运行速度
    • 4.2 项目吞吐
    • 4.3 稳定性

说的比较简单,有需要的可以留言,我不断补充完善

1 项目背景

我们公司内部的需要一款,能在不同的olap之间做数据传递与拷贝,例如 iceberg到doris,到mysql,甚至到kafka的,这么一个数据同步工具,要尽可能简单,尽可能维护容易。所以有了这么一个项目的诞生,目前可以实现,通过一条简短的shell命令,实现不同数据库与存储之间的数据拷贝。

目前这套方案,在公司内部已经部署4个数据团队,服务对象产品+数据研发 超过100人,直接使用使用的业务对象,超过 4000人。

2 项目实现

2.1 实现原理

目前的实现是通过spark来实现,分为两个部分:

  • 1 写入同步信息:同步任务记录,是一个很简单的,通过shell 传参,调用一个spark任务,执行一个简单的,数据插入动作,将 数据源表,目标,持有人,过滤或想要保留的字段,数据筛选项等一些信息,传入spark任务中,并将数据写入一个mysql中保存。其中源表与目标表,通过 catalog__database__schema__tablename的方式保存,并维护了一套catalog,通过前缀就可以知道数据在哪个引擎的哪个表中。
  • 2 读取同步信息:一个常驻的,死循环的spark任务,会定期遍历mysql,会筛选出目前符合条件的,未过期的,同步任务,使用ExecutionContext 和 Future ,来并发执行同步任务,通过源信息,与反射 ,维护一个连接的配置项,来做隔离,保证数据传入时,不会涉及隐私信息

2.2 细节要点

  • 并发部分,可以通过【读取同步信息】 任务部分启动时,动态传参,来控制数据流量
  • 在任务中,维护了3个列表,分别保证,同一个任务只会执行一次,同一个目标表,同一时间只有一个任务在写入,任务执行超过配置时间,会自动杀死,并允许新的任务调起,这样就可以保证不会触发目标的锁,并控制重复提交
  • 通过对不同传入参数解析,对于每个目标引擎单独部署独立的同步任务,做到资源隔离
  • spark任务 每个并发执行有做到很好的异常捕获,发生问题时,可以调用报警接口,发送信息到持有人飞书中;对于常驻的 【读取同步信息】整体任务监控,做到2天杀死重新启动,并每5分钟pid判活,保证任务的执行中
  • 任务监控与判断,对目标数据与原始数据做数据量校验,对数据过程中的日志做接受,扫描错误日志等,保证要给

3 抽样说明

这里抽样说明一下 ,iceberg 同步数据到hologres 时的要点,其实整体的使用都相同,不过在开发的时候,可以根据不同的引擎做不同的细节调整 : 例入hologres

  • 使用spark-connector-hologres的连接器写入数据,连接器会先在hologres引擎中创建临时表,数据写入完成后,再做insert overwrite动作,因为分布式存储的问题,所以就需要在代码里手动执行set hg_experimental_force_sync_replay = on; 来保证元数据在不同节点的同步
  • 使用hologres连接器,对原始数据量做判断,超过1千万的,执行serverless,也就是后被隐藏能源!
  • 增加1次的任务重试,减少因为元数据不同步导致的表不存在的bug
  • 目标数据是视图的方式,也有分区表,可以在代码中做判断并刷新视图,保证数据插入可以兼容

总的来说,可以根据不同的业务目标库与使用方法,做单独的优化迭代,保证到每次的同步都是最优的选项

4 项目运行状态

还是以iceberg 到 hologres 为例,某个实例的spark资源情况为 180个 Executor,每个4G,16G的DM,参数配置为:

--conf spark.sql.catalog.iceberg_zjyprc_hadoop.cache-enabled=false
--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=30
--conf spark.network.timeout=180000
--conf spark.slow.shuffle.fetch.time.blacklist.threshold=60000
--conf spark.speculation=false
--conf spark.excludeOnFailure.enabled=false
--conf spark.task.maxFailures=1

4.1 运行速度

目前 5000w条数级别的数据量,大概需要 16-17分钟,而且这里面有一半的时间时因为hologres连接器在内部重新shuffle,如果目标是mysql之类的,速度会提高至少一半

4.2 项目吞吐

目前每日同步 9000张表,总数据量大概 1-2T左右,基本可以满足业务需求

4.3 稳定性

通过上述的监控与定期重启,配合计算引擎的升级,同步迭代工具的使用,例如hologres 支持了insert overwrite 命令,可以实现写cpu打满也不会影响读的使用,同步迭代最新版本,可以保证业务的高可用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/96139.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/96139.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/96139.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 时间戳

在C#中,获取当前时间的毫秒级时间戳可以通过多种方式实现。以下是几种常见的方法:方法1:使用DateTime和DateTimeOffsetlong timestamp (long)(DateTimeOffset.Now.ToUnixTimeMilliseconds()); Console.WriteLine(timestamp);方法2&#xff1…

【牛客刷题】REAL792 小O的平面画圆

文章目录 一、题目介绍 1.1 输入描述 1.2 输出描述 1.3 示例 二、算法设计思路 2.1 核心问题分析 2.2 图解两个圆的位置关系 2.2.1. 相离 (Separate) 2.2.2. 外切 (Externally Tangent) 2.2.3. 相交 (Intersecting) 2.2.4. 内切 (Internally Tangent) 2.2.5. 包含 (Containing)…

uniapp:微信小程序使用Canvas 和Canvas 2D绘制图形

一、Canvas 画布 canvas 组件 提供了绘制界面,可以在之上进行任意绘制 功能描述 Canvas 画布。2.9.0 起支持一套新 Canvas 2D 接口(需指定 type 属性),同时支持同层渲染,原有接口不再维护。 二、Canvas 和Canvas 2D 区…

word如何转换为pdf

pip install pywin32import os import win32com.client import pythoncom # 新增:用于处理COM线程 import sysdef docx_to_pdf(docx_path, pdf_pathNone):"""将Word文档转换为PDF格式,修复退出时的COM错误"""if not os.p…

服务器Linux防火墙怎样实现访问控制

在互联网世界里,Linux服务器就像一座城池,而防火墙便是城池的守卫者。没有防火墙,外部的任何流量都能毫无阻拦地进入服务器;而有了防火墙,就可以像设关卡一样,对进出城门的人进行盘查和控制。对企业运维人员来说&#…

【原创理论】Stochastic Coupled Dyadic System (SCDS):一个用于两性关系动力学建模的随机耦合系统框架

【原创理论】Stochastic Coupled Dyadic System (SCDS):一个用于两性关系动力学建模的随机耦合系统框架 作者:[望月,GPT5,GPT-O3,Gemini2.5pro] 分类: 人工智能 理论模型 交叉学科 系统科学 人性 爱情 标签: 关系动力…

星图云开发者平台新功能速递 | 微服务管理器:无缝整合异构服务,释放云原生开发潜能

在构建现代数字化应用的过程中,开发者常常面临一个关键挑战:如何高效、安全地集成和复用既有的复杂服务或自有业务系统?这些服务可能是核心算法引擎、遗留业务逻辑模块,或是特定的SaaS能力。传统方式下,将它们融入新的…

数据结构:构建 (create) 一个二叉树

目录 问题的本质——什么信息才能唯一确定一棵树? 推导“最佳拍档”——哪两种遍历序列能行? 递归思想——如何构建一棵树? 第1步:确定整棵树的根节点 第2步:划分左右子树的成员 第3步:递归构建左右子…

【STM32】HAL库中的实现(五):ADC (模数转换)

什么是 ADC(模数转换器) ADC(Analog to Digital Converter)是将 模拟信号(电压)转换成数字信号(数值) 的器件。 在 STM32 中,ADC 通常具有以下特性:特性描述分…

智慧校园中IPTV融合对讲:构建高效沟通新生态

在智慧校园的建设浪潮里,IPTV融合对讲系统宛如一颗璀璨的新星,以其独特的功能和强大的优势,为校园的沟通与管理带来了全新的变革,构建起一个高效、便捷、智能的沟通新生态。从日常沟通层面来看,IPTV融合对讲系统打破了…

智能合约里的 “拒绝服务“ 攻击:让你的合约变成 “死机的手机“

你有没有遇到过手机突然卡死,点什么都没反应的情况?在区块链世界里,智能合约也可能遭遇类似的 "罢工"—— 这就是 "拒绝服务攻击"(Denial of Service,简称 DoS)。今天用大白话讲讲合约…

安全设计-防止非法移机

前言我们的设备在实际使用过程中,在我们的巡查机制粒度下,发现依然有设备被非法移动到其他非计划点位。因此,我们需要设计一套及时预警,但是对客户无感,不影响业务办理的防范机制。1.方案设计交互图2.方案说明 2.1方案…

OpenHarmony之三方库适配深度实践:从移植到合规的全链路指南

1. 为什么要做三方库适配?——更深层的价值分析 维度 现状痛点 预期收益 深度价值 生态 成熟开源库无法直接运行 复用 10+ 年开源沉淀,提升功能覆盖率 避免生态碎片化:通过标准化适配流程,确保不同厂商对同一库的实现一致 性能 JS 层重实现耗 CPU 原生 C/C++ 加速 3~10 倍 …

2025年09月计算机二级MySQL选择题每日一练——第一期

计算机二级中选择题是非常重要的,所以开始写一个每日一题的专栏。 答案及解析将在末尾公布! 今日主题:MySQL 基础概念 1、以下关于数据库的特点中,描述正确的是( ) A. 数据无冗余 B. 数据不可共享&#xff…

JAVA字符串操作——在蓝桥杯的基本应用

我们来系统地梳理一下 Java 中的字符串操作。Java 的字符串操作非常丰富,主要涉及到 String、StringBuilder 和 StringBuffer 这三个核心类。 目录 一、核心类简介 二、String 类的常用操作 1. 创建字符串 2. 获取基本信息 3. 比较字符串 4. 查找与判断 5. 转…

【深度学习基础】PyTorch Tensor生成方式及复制方法详解

目录PyTorch Tensor生成方式及复制方法详解一、Tensor的生成方式(一)从Python列表/元组创建(二)从NumPy数组创建(三)特殊初始化方法(四)从现有Tensor创建(五)…

动态规划:入门思考篇

1. 简单类比 假如我们要求全国人数,那么我们只要知道各个省的人数,然后将各个省的人数相加即可,要想知道各个省的人数,只要将这个省下面所有的市人数相加即可,同样,如果想要知道各个市的人数,只…

小杨的 X 字矩阵(举一反三)-洛谷B3865 [GESP202309 二级]

题目描述 小杨想要构造一个 X 字矩阵( 为奇数),这个矩阵的两条对角线都是半角加号 ,其余都是半角减号 - 。例如,一个 55 的 X 字矩阵如下: --- --- ---- --- --- 请你帮小杨根据给定的 打印出对应的“X …

数据组合与合并:Pandas 数据整合全指南 +缺失值处理

数据组合与合并:Pandas 数据整合全指南在进行数据分析之前,数据清洗与整合是关键步骤。 遵循“整洁数据”(Tidy Data)原则: 每个观测值占一行每个变量占一列每种观测单元构成一张独立的表格 整理好数据后,常…

c#联合halcon的基础教程(案例:亮度计算、角度计算和缺陷检测)(含halcon代码)

目录 1.环境配置 2.案例一:亮度计算 halcon代码: 主界面代码: 3.案例二: 角度计算 halcon代码: 主界面代码: 4.案例三:缺陷检测 halcon代码: 主界面代码: 通过…