手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

如何利用 Apache Flink 结合 CDC(Change Data Capture,变更数据捕获)技术,将 MySQL 的数据实时导入 StarRocks,打造高效的实时数仓。这不仅是企业数字化转型的利器,也是技术人提升竞争力的绝佳实战场景!

在数据驱动的时代,实时性是企业的核心竞争力。传统的批量 ETL(抽取-转换-加载)方式往往因为延迟高、效率低而无法满足实时分析需求。而 Flink 作为流处理的王者,搭配 CDC 捕获 MySQL 的增量变更,再结合 StarRocks 的高性能分析能力,形成了一个强大的实时数据入湖方案。无论你是数据库工程师、数据分析师,还是对数仓建设感兴趣的初学者,这篇文章都将手把手带你完成从环境搭建到整库同步的实战流程。

通过这篇博文,你将学会如何安装 Flink 和 MySQL CDC 连接器,编写 YAML 文件实现整库同步,并将数据无缝导入 StarRocks。准备好服务器,泡杯咖啡,咱们一起开启这场实时入湖的实战之旅吧!

**准备好你的服务器,泡杯咖啡,咱们一起“上代码、上步骤、上实战”,开启这场实时入湖的硬核之旅!
**


文章目录

  • 手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
  • 第一步:为什么选择 Flink + CDC + StarRocks?
  • 第二步:环境准备与工具安装
    • 具体配置
    • 2.0 MySql 配置
      • 验证是否是 binlog 模式
      • 如果没有 需要 启动 binlog 模式
      • 创建数据库和表 用来同步
    • 2.1 java 安装
    • 2.2 下载 Flink 1.20.1
    • 2.3 解压 Flink 1.20.1
    • 2.4 配置 Flink-1.20.1/conf/config.yaml
    • 2.5 下载 Flink-CDC-3.3.0
    • 2.6 解压 Flink-CDC-3.3.0
    • 2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
    • 2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
    • 2.9 移到 Flink-cdc-3.3.0/lib 下
    • 2.10 下载 MySQL JDBC 驱动
    • 2.11 启动 Flink-1.20.1
    • 2.12 网络和数据库连接 测试
    • 2.13 创建 yaml
    • 2.14 运行 yaml
  • 最后验证 是否同步
    • 查看starrocks
    • 连接 192.168.5.128:8030 ,并输入 用户名密码


第一步:为什么选择 Flink + CDC + StarRocks?

Flink 是一个强大的流处理框架,擅长处理实时数据流,吞吐量高、延迟低,非常适合实时数仓场景。而 CDC 技术能捕获 MySQL 数据库的增量变更(比如插入、更新、删除操作),让我们无需全量扫描数据库,就能实时获取数据变化。至于 StarRocks,它是一个高性能的分析型数据库,查询速度快,支持实时分析场景。把这三者结合起来,简直是实时数据入湖的“黄金三角”!


第二步:环境准备与工具安装

先是vm 上 安装了 4台 linux centos 8 (如下图)
配置了 4台 VM 虚拟机
3台 安装 StarRocks
1台安装 mysql +flink+flinkcdc

具体可以参考Streaming ELT 同步 MySQL 到 StarRocks

但官方的例子 需要安装 docker 。当你真实配置你会发现,环境各有不同 ,不同
java 版本 ,Flink 和 cdc 用什么版本 ,有没有什么依赖性都需要考虑 问题。
包括 mysql 和 starocks 版本等.
我在实践中也出现的很多问题,经过很多尝试最终完成.

具体配置

组件安装 IP 地址角色/说明
Apache Flink192.168.5.131Flink 集群(JobManager + TaskManager)
Flink CDC 连接器192.168.5.131部署于 Flink 的 lib 目录,用于捕获 MySQL 变更数据
MySQL192.168.5.131源数据库,提供数据并启用 Binlog
StarRocks FE + BE192.168.5.128StarRocks 前端(FE)+ 后端(BE)
StarRocks BE192.168.5.129StarRocks 后端(BE)节点 2
StarRocks BE192.168.5.130StarRocks 后端(BE)节点 3

说明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 连接器通常作为 JAR 文件放置在 Flink 的 lib 目录下,与 Flink 共享同一节点。

MySQL:与 Flink 部署在同一服务器(192.168.5.131),需确保 Binlog 已启用。

StarRocks:分布式部署,FE 和一个 BE 节点在 192.168.5.128,另外两个 BE 节点分别在 192.168.5.129 和 192.168.5.130,形成一个典型的多节点集群。

网络要求:确保所有 IP 地址之间网络互通,特别是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需开放相关端口。

关于 怎么安装 starrocks 可以访问 Starrocks 中文论坛
关于 怎么安装 mysql 可以访问 这篇三步搞定 mysql 8.0的安装

本案列不需要安装 Docker
在这里插入图片描述

CentOS8_cd_Flink
可以看到 配置不高,主要用来完成这个实验 ,
在这里插入图片描述
如果是生产环境建议以下配置
Flink 是一个资源密集型的流处理框架,对 CPU、内存、磁盘和网络有一定要求。以下是推荐的硬件配置:

开发/测试环境(单节点或小型集群)

: CPU:4 核 ~ 8 核(如 Intel Xeon 或 AMD EPYC,推荐 2.5 GHz
以上)。 内存:16 GB ~ 32 GB(Flink 作业和 JVM 堆内存需至少 8 GB)。 磁盘:500 GB
SSD(用于存储检查点、日志和临时数据)。 网络:千兆网卡(1 Gbps),确保低延迟数据传输。

生产环境(分布式集群):

CPU:16 核 ~ 32 核 per TaskManager(推荐多核 CPU 以支持高并行度)。 内存:64 GB ~ 128 GB
per TaskManager(建议为 Flink 分配 70%~80% 的内存,剩余用于操作系统)。 磁盘:1 TB ~ 2 TB
NVMe SSD(高 IOPS,适合检查点和状态存储)。 网络:万兆网卡(10 Gbps),支持高吞吐量数据传输。 节点数:至少 3
个节点(1 个 JobManager + 2 个 TaskManager),可根据任务规模扩展。

2.0 MySql 配置

验证是否是 binlog 模式

SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE '%binlog%'; 

在这里插入图片描述
在这里插入图片描述

如果没有 需要 启动 binlog 模式

# /etc/my.cnf
[mysqld]
log-bin=mysql-bin          # 启用
binlog-format=ROW

– 1. 检查 binlog 是否启用 SHOW VARIABLES LIKE ‘log_bin’; – 必须是 ON

– 2. 设置格式为 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;

– 3. 设置合理的过期时间 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天

创建数据库和表 用来同步

-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

2.1 java 安装

可以看到 java 已经安装好 版本 11
在这里插入图片描述

2.2 下载 Flink 1.20.1

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz

在这里插入图片描述

2.3 解压 Flink 1.20.1

tar -xzf flink-1.20.1-bin-scala_2.12.tgz
cd flink-1.20.1
ll

在这里插入图片描述

在这里插入图片描述

2.4 配置 Flink-1.20.1/conf/config.yaml

vim flink-1.20.1/conf/config.yaml
改为 rest.address: 0.0.0.0
改为 rest.bind-address: 0.0.0.0

rest.address 和 rest.bind-address 是与 Flink 的 REST API 和 Web UI 相关的配置项,用于控制 Flink JobManager 的 REST 服务监听地址。这些配置决定了 Flink 的 Web UI 和客户端如何访问 JobManager。
在这里插入图片描述

2.5 下载 Flink-CDC-3.3.0

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz

在这里插入图片描述

2.6 解压 Flink-CDC-3.3.0

tar -xzf flink-cdc-3.3.0-bin.tar.gz
cd flink-cdc-3.3.0

在这里插入图片描述

2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar

wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar

2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar

wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar

2.9 移到 Flink-cdc-3.3.0/lib 下

在这里插入图片描述

2.10 下载 MySQL JDBC 驱动

wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar

2.11 启动 Flink-1.20.1

./flink-1.20.1/bin/start-cluster.sh

在这里插入图片描述

打开 网页
192.168.5.131 :8081,如果可以打开说明启动成功
在这里插入图片描述

点击 Task Managers ,说明基本正常

在这里插入图片描述

2.12 网络和数据库连接 测试

ping 192.168.5.131
ping 192.168.5.128telnet 192.168.5.131 3306
telnet 192.168.5.128 9030

在这里插入图片描述

2.13 创建 yaml

在这里插入图片描述

source:type: mysqlhostname: 192.168.5.131  # 修改为实际 MySQL 地址port: 3306username: rootpassword: 123456tables: app_db.\.*  #app_db修改为库名server-id: 5400-5404sink:type: starrocksjdbc-url: jdbc:mysql://192.168.5.128:9030  # 修改为实际 StarRocks 地址load-url: 192.168.5.128:8030username: rootpassword: 123456table.create.properties.replication_num: 1pipeline:name: MySQL to StarRocks Pipelineparallelism: 1

source:
type: mysql
hostname: 192.168.5.131 # MySQL 服务器 IP 地址
port: 3306 # MySQL 端口
username: root # 连接
MySQL 的用户名
password: 123456 # 连接 MySQL 的密码 tables:
app_db…* # 需要同步的表,支持正则,这里是 app_db 库下的所有表
server-id:
5400-5404 # MySQL binlog server_id 范围(Flink CDC 会随机选一个)

sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.5.128:9030 # StarRocks JDBC 连接地址
load-url: 192.168.5.128:8030 # StarRocks Stream Load 地址
username: root # StarRocks 用户名
password: 123456 # StarRocks 密码
table.create.properties.replication_num: 1 # 表副本数设置为1

pipeline:
name: MySQL to StarRocks Pipeline # 管道名称
parallelism: 1 # 并行度设置为1

2.14 运行 yaml

因为有依赖关系 需要放在 flink-cdc-3.3.0 目录下面
在这里插入图片描述

./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

./bin/flink-cdc.sh - 这是 Flink CDC 的启动脚本
–flink-home /root/flink-1.20.1 - 指定 Flink 的安装目录
mysql-to-starrocks-pipeline.yaml - 配置文件路径

在这里插入图片描述

页面如下
在这里插入图片描述
在这里插入图片描述
设置

./bin/flink-cdc.sh -Dexecution.checkpointing.interval=3000 --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

-Dexecution.checkpointing.interval=3000
-D: 这是 JVM 参数前缀,用于设置系统属性
execution.checkpointing.interval: Flink 配置参数名
3000: 时间间隔,单位是毫秒,即 3000ms = 3秒

最后验证 是否同步

查看starrocks

可以从图中看到原来 starrocks 是都没有数据库的
现在有数据库,表也自动同步好了。
在这里插入图片描述

连接 192.168.5.128:8030 ,并输入 用户名密码

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92975.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92975.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92975.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rust:anyhow 高效错误处理库核心用法详解

以下是 anyhow 库在 Rust 中的核心用法详解(结合最佳实践和示例): 🔰 一、anyhow 的核心价值 用于简化错误处理,尤其适合: 需要快速原型开发的应用需要丰富错误上下文(Context)的场…

阿里云服务linux安装单机版

一、单机安装Redis 阿里教程 下载地址:redis下载地址 1、首先需要安装Redis所需要的依赖: yum install -y gcc tcl 2、下载Redis 注:也可以自己下好然后上传到云服务 wget https://gitcode.net/weixin_44624117/software/-/raw/master/software/Li…

python之uv使用

文章目录安装与更新standalonepip 安装创建以及初始化项目依赖管理uv run直接在命令行运行python代码片段直接运行项目中可执行脚本文件运行python包中快捷指令uv项目本地运行调试细节vscode 中运行调试uv项目命令行运行深入理解 uv lock, uv sync, uv lockuv lock 行为解析:uv…

【CV 目标检测】①——目标检测概述

一、目标检测概述 1.目标检测 目标检测(Object Detection)的任务是找出图像中所有感兴趣的目标,并确定它们的类别(分类任务)和位置(回归任务) 目标检测中能检测出来的物体取决于当前任务&…

C#图形库SciChart与ScottPlot及LiveCharts2对比

一.概述 1.SciChart SciChart 是一个专为企业级应用设计的高性能数据可视化库,提供跨平台的图表解决方案,支持 .NET、JavaScript、iOS 和 Android 等多个平台。它以卓越的渲染性能、丰富的专业图表类型和强大的交互功能著称, 广泛应用于金…

Win10电脑密码忘记如何进入操作系统

http://xq128.com/zj.htmlhttps://share.feijipan.com/s/LbFdbUKl下载后,准备一个空的U盘,大于4G。将U盘制作为PE盘。之后将制作好的PE盘插入到电脑中,启动待去除密码的电脑台式机,启动后一直按住F12,进入BIOS。选择下…

[免费]基于Python的网易云音乐热门歌单可视化大屏项目(flask+pandas+echarts+request库)【论文+源码+SQL脚本】

大家好,我是python222_小锋老师,看到一个不错的基于Python的网易云音乐热门歌单可视化大屏项目(flaskpandasechartsrequest库),分享下哈。 项目视频演示 【免费】基于Python的网易云音乐热门歌单可视化大屏项目(flaskpandasecharts爬虫) Py…

AR 智能眼镜:从入门到未来

从零看懂 AR 智能眼镜:未来 10 年技术演进与新手入门指南 在这个数字技术飞速迭代的时代,AR 智能眼镜正从科幻电影走进现实。从 2025 年重量不足 35 克的消费级产品,到 2030 年成为 “第二大脑” 的生活刚需,再到 2040 年进化为神经接口终端,AR 智能眼镜的发展将重塑人类…

初识Vue2及MVVM理解

1、什么是Vue Vue是一款用于构建用户界面的JavaScript框架。它基于标准HTML、CSS和JavaScript构建,并提供了一套声明式的、组件化的编程模型,可以高效地开发用户界面。 Vue.js是一套构建用户界面的渐进式框架,采用自底向上增量开发的设计&…

Rust:专业级错误处理工具 thiserror 详解

Rust:专业级错误处理工具 thiserror 详解 thiserror 是 Rust 中用于高效定义自定义错误类型的库,特别适合库开发。相比 anyhow 的应用级错误处理,thiserror 提供更精确的错误控制,让库用户能模式匹配具体错误。 📦 基…

Python网络爬虫(一) - 爬取静态网页

文章目录一、静态网页概述1. 静态网页介绍2. 静态网页爬取技术Requests介绍二、安装 Requests 库三、发送请求并获取响应1. 发送 GET 请求1.1 get() 方法介绍1.2 get() 方法签名介绍1.3 get() 方法参数介绍1.4 示例:发送get请求2. 发送 POST 请求2.1 post() 方法介绍…

.NET/C# webapi框架下给swagger的api文档中显示注释(可下载源码)

bg&#xff1a;.NET/C#真的是越来越凉了。用的是.net9&#xff0c;创建完自带一个天气预报api拿来测试就行 1、在Controllers中弄多几个&#xff0c;并写上注释 /// <summary> /// Post注释 /// </summary> /// <returns></returns> [HttpPost] publ…

2508C++,检测S模式

原文 可用Windows.System.Profile.WindowsIntegrityPolicy类检测S模式. //C# using Windows.System.Profile; if (WindowsIntegrityPolicy.IsEnabled) {//系统在S模式if (WindowsIntegrityPolicy.CanDisable) {//系统在S模式,但可退出S模式suggestCompanion true;} else {//系…

Coding Exercising Day 9 of “Code Ideas Record“:StackQueue part 01

文章目录1. Theoretical basisThe C standard library has multiple versions. To understand the implementation principles of stack and queue, we must know which STL version we are using.The stack and queue discussed next are data structures in *SGI STL*. Only …

Mysql数据仓库备份脚本

Mysql数据仓库备份脚本 #!/bin/bash# MySQL数据库完整备份脚本 # 功能: 查询所有数据库 -> 分别导出 -> 压缩打包# 配置区域 # MySQL连接信息 MYSQL_USER"root" MYSQL_PASSWORD"root" MYSQL_HOST"localhost" MYSQL_PORT"3306"…

基于嵌入式Linux RK3568 qt 车机系统开发

嵌入式系统、Qt/QML 与车机系统的发展趋势分析 1. RK3568 开发板与 OpenGL ES 3 支持&#xff0c;为图形应用打下坚实基础 RK3568 是瑞芯微&#xff08;Rockchip&#xff09;推出的一款高性能、低功耗的64位处理器&#xff0c;广泛用于工业控制、智能终端、嵌入式车载系统等领…

OceanBase架构设计

本文主要参考《大规模分布式存储系统》 基本结构客户端&#xff1a;发起请求。 RootServer&#xff1a;管理集群中的所有服务器&#xff0c;子表数据分布及副本管理&#xff0c;一般为一主一备&#xff0c;数据强同步。 UpdateServer&#xff1a;存储增量变更数据&#xff0c;一…

[Element-plus]动态设置组件的语言

nuxt element-plus国际化vue element-plus国际化<template><div class"container"> <!-- <LangSwitcher />--><button click"toggle(zh-cn)">中文</button><button click"toggle(en)">English<…

【VS Code - Qt】如何基于Docker Linux配置Windows10下的VS Code,开发调试ARM 版的Qt应用程序?

如何在Windows 10上配置VS Code以开发和调试ARM版Qt应用程序。这需要设置一个基于Docker的Linux环境。首先&#xff0c;让我们了解一下你的具体需求和环境&#xff1a;你有一个Qt项目&#xff08;看起来是医学设备相关的设置程序&#xff09;目标平台是ARM架构你希望在Windows …

linux常见故障系列文章 1-linux进程挂掉原因总结和排查思路

问题一 &#xff1a;运行时常见的进程崩溃原因 内存不足&#xff09; **0. 内存不足 内存不足&#xff08;OOM Killer&#xff09; 排查 OOM&#xff1a;free -h → dmesg → ps aux --sort-%mem 预防 OOM&#xff1a;限制关键进程内存、调整 OOM Killer 策略、增加 swap 长期优…