Flinkcdc 实现 MySQL 写入 Doris

Flinkcdc 实现 MySQL 写入 Doris

一、环境配置

Doris:3.0.4 + JDK 17
MySQL (业务数据库):5.7
MySQL(本地数据库):5.7
Flink:flink-1.19.1
flinkcdc:flink-cdc-3.3.0

二、环境搭建

  1. 下载 Flink 1.19.1
wget https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz

解压后得到 flink-1.19.1 目录,设置 FLINK_HOME 为 flink-1.19.1 所在目录

sudo vim /etc/profile.d/my_env.sh

添加 Flink 路径

export JAVA_HOME=/opt/module/java
export PATH=$PATH:$JAVA_HOME/bin
export FLINK_HOME=/opt/module/flink-1.19.1
export PATH=$PATH:$FLINK_HOME/bin 
  1. 读入数据通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
execution.checkpointing.interval: 3000
  1. 使用下面的命令启动 Flink 集群
./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,如下所示:
在这里插入图片描述
多次执行 start-cluster.sh 可以拉起多个 TaskManager。

部署 Doris 和 MySQL 这部分省略

添加配置文件
Flink lib 目录下:
在这里插入图片描述
flinkcdc 3.3.0 lib 目录下
在这里插入图片描述

三、通过 Flink CDC CLI 提交任务

编写任务配置 yaml 文件。 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: 172.16.11.154port: 3306username: rootpassword: xxxtables: app_db.\.*server-id: 5400-5404server-time-zone: 'Asia/Shanghai'
sink:type: dorisfenodes: 172.16.10.181:8030benodes: 172.16.10.181:8040username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 1

其中: source 中的 tables: app_db..* 通过正则匹配同步 app_db 下的所有表。 sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。

最后,通过命令行提交任务到 Flink Standalone cluster

bash bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后,返回信息如:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。
在这里插入图片描述
后续相关信息,参考官网即可
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/get-started/quickstart/mysql-to-doris/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/79404.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/79404.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/79404.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux庖丁解牛】—环境变量!

目录 1. 环境变量 1.1 概念介绍 1.2 命令行参数 1.3 一个例子,一个环境变量 1.4 认识更多的环境变量 1.5 获取环境变量的方法 a. 指令操作 b. 代码操作 1.6 理解环境变量的特性 a.环境变量具有全局特性 b.补充两个概念(为后面埋一个伏笔) 1. 环境变量 …

LangChain4j +DeepSeek大模型应用开发——7 项目实战 创建硅谷小鹿

这部分我们实现硅谷小鹿的基本聊天功能,包含聊天记忆、聊天记忆持久化、提示词 1. 创建硅谷小鹿 创建XiaoLuAgent package com.ai.langchain4j.assistant;import dev.langchain4j.service.*; import dev.langchain4j.service.spring.AiService;import static dev…

普通 html 项目也可以支持 scss_sass

项目结构示例 下载vscode的插件Live Sass Compiler 自动监听编译scss 下载插件Live Server 用于 web 服务器,打开 html 文件到浏览器,也可以不用这个,自己用 nginx 或者宝塔其他 web 工具 新建一个 index.scss打开,点击 vscode 底…

网工_IP协议

2025.02.17:小猿网&网工老姜学习笔记 第19节 IP协议 9.1 IP数据包的格式(首部数据部分)9.1.1 IP协议的首部格式(固定部分可变部分) 9.2 IP数据包分片(找题练)9.3 TTL生存时间的应用9.4 常见…

SQL语句练习 自学SQL网 在查询中使用表达式 统计

目录 Day 9 在查询中使用表达式 Day 10 在查询中进行统计 聚合函数 Day 11 在查询中进行统计 HAVING关键字 Day12 查询执行顺序 Day 9 在查询中使用表达式 SELECT id , Title , (International_salesDomestic_sales)/1000000 AS International_sales FROM moviesLEFT JOIN …

基于机器学习的舆情分析算法研究

标题:基于机器学习的舆情分析算法研究 内容:1.摘要 随着互联网的飞速发展,舆情信息呈现爆炸式增长,如何快速准确地分析舆情成为重要课题。本文旨在研究基于机器学习的舆情分析算法,以提高舆情分析的效率和准确性。方法上,收集了近…

菲索旋转齿轮法:首次地面光速测量的科学魔术

一、当齿轮邂逅光束:19世纪的光速实验室 1849年,法国物理学家阿曼德菲索(Armand Fizeau)在巴黎郊外的一座庄园里,用一组旋转齿轮、一面镜子和一盏油灯,完成了人类首次地面光速测量。他的实验测得光速为315…

上位机知识篇---PSRAM和RAM

文章目录 前言一、RAM(Random Access Memory)1. 核心定义分类:SRAM(静态RAM)DRAM(动态RAM) 2. 关键特性SRAM优点缺点应用 DRAM优点缺点应用 3. 技术演进DDR SDRAMLPDDR(低功耗DRAM&a…

Qt QComboBox 下拉复选多选(multicombobox)

Qt QComboBox 下拉复选多选(multicombobox),备忘,待更多测试 【免费】QtQComboBox下拉复选多选(multicombobox)资源-CSDN文库

ElasticSearch深入解析(五):如何将一台电脑上的Elasticsearch服务迁移到另一台电脑上

文章目录 0.安装数据迁移工具1.导出数据2.导出mapping3.导出查询模板4.拷贝插件5.拷贝配置6.导入到目标电脑上 0.安装数据迁移工具 Elasticsearch dump是一个用于将Elasticsearch索引数据导出为JSON格式的工具。你可以使用Elasticsearch dump通过命令行或编程接口来导出数据。…

微服务中组件扫描(ComponentScan)的工作原理

微服务中组件扫描(ComponentScan)的工作原理 你的问题涉及到Spring框架中ComponentScan的工作原理以及Maven依赖管理的影响。我来解释为什么能够扫描到common模块的bean而扫描不到其他模块的bean。 根本原因 关键在于**类路径(Classpath)**的包含情况: Maven依赖…

Python镜像源配置:

1.用命令进行配置: 1. 使用命令行方式更改镜像源 可以直接通过 pip config 命令来设置全局或用户级别的镜像源地址。例如,使用清华大学开源软件镜像站作为新的索引 URL: pip config set global.index-url https://pypi.tuna.tsinghua.edu.…

【SpringBoot】Spring中事务的实现:声明式事务@Transactional、编程式事务

1. 准备工作 1.1 在MySQL数据库中创建相应的表 用户注册的例子进行演示事务操作,索引需要一个用户信息表 (1)创建数据库 -- 创建数据库 DROP DATABASE IF EXISTS trans_test; CREATE DATABASE trans_test DEFAULT CHARACTER SET utf8mb4;…

javascript 深拷贝和浅拷贝的区别及具体实现方案

一、核心区别 特性浅拷贝深拷贝复制层级仅复制对象的第一层属性递归复制对象的所有层级属性(包括嵌套对象和数组)引用关系嵌套对象/数组与原对象共享内存(引用拷贝)嵌套对象/数组与原对象完全独立(值拷贝)…

pytorch对应gpu版本是否可用判断逻辑

# gpu_is_ok.py import torchdef check_torch_gpu():# 打印PyTorch版本print(f"PyTorch version: {torch.__version__}")# 检查CUDA是否可用cuda_available torch.cuda.is_available()print(f"CUDA available: {cuda_available}")if cuda_available:# 打印…

国内无法访问GitHub官网的问题解决

作为一名程序员,在国内访问GitHub官网经常会遇到打开过慢或者访问失败的问题,但通过一些技巧可以改善访问体验。GitHub访问问题的根源在于GitHub官网访问不稳定的主要原因在于DNS解析过程。当我们直接访问github.com时,需要通过DNS服务器将域…

使用 MediaPipe 和 OpenCV 快速生成人脸掩膜(Face Mask)

在实际项目中,尤其是涉及人脸识别、换脸、图像修复等任务时,我们经常需要生成人脸区域的掩膜(mask)。这篇文章分享一个简单易用的小工具,利用 MediaPipe 和 OpenCV,快速提取人脸轮廓并生成二值掩膜图像。 …

【动态导通电阻】GaN功率器件中动态导通电阻退化的机制、表征及建模方法

2019年,浙江大学的Shu Yang等人在《IEEE Journal of Emerging and Selected Topics in Power Electronics》上发表了一篇关于GaN(氮化镓)功率器件动态导通电阻(Dynamic On-Resistance, RON)的研究论文。该文深入探讨了GaN功率器件中动态导通电阻退化的机制、表征方法、建模…

从括号匹配看栈:数据结构入门的实战与原理

在计算机科学的世界里,数据结构是程序员的 “瑞士军刀”,不同的数据结构适用于不同的场景,能高效解决各类问题。其中,栈作为一种简单却强大的数据结构,在很多实际应用中发挥着关键作用。今天,我们就通过一个…

Dubbo(89)如何设计一个支持多语言的Dubbo服务?

设计一个支持多语言的Dubbo服务需要考虑以下几个方面: 服务接口设计:确保服务接口的定义可以被不同语言实现。序列化协议:选择一个支持多语言的序列化协议,例如Protobuf、Thrift、gRPC等。服务注册与发现:确保服务注册…