一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)

MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnfmy.ini)中添加以下配置:

# 启用二进制日志
log-bin=mysql-bin
# 二进制日志格式(推荐ROW模式,记录行级变更)
binlog-format=ROW
# 启用GTID(高可用必备)
gtid-mode=ON
enforce-gtid-consistency=ON
# 从库同步时记录binlog(主从架构需要)
log-slave-updates=ON
# 避免长连接超时(大表快照时需要)
interactive_timeout=3600
wait_timeout=3600

配置说明

  • log-bin:指定二进制日志文件名前缀,MySQL 会自动生成如 mysql-bin.000001 的文件
  • binlog-format=ROW:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更
  • gtid-mode:全局事务标识符,用于主从切换时保证数据一致性
  • log-slave-updates:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';-- 刷新权限
FLUSH PRIVILEGES;

权限说明

  • SELECT:读取表数据(快照阶段需要)
  • SHOW DATABASES:获取数据库列表(用于正则匹配监控库)
  • REPLICATION SLAVE:读取 binlog 必备权限
  • REPLICATION CLIENT:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID

每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):

# 在my.cnf中添加
server-id=1001  # 任意唯一整数,建议范围5400-6400

说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N),例如:

-- Flink SQL 中通过Hints设置Server ID范围
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)

pom.xml 中添加 MySQL CDC 连接器依赖:

<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version><!-- 若使用Flink 1.14+,无需添加scope --><scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven环境)
  1. 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
  2. 将 JAR 包放入 $FLINK_HOME/lib/ 目录
  3. 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)
SET 'execution.checkpointing.interval' = '3s';-- 创建MySQL CDC表
CREATE TABLE mysql_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,-- 可选:添加元数据列db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,row_kind STRING METADATA FROM 'row_kind' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'flink_cdc','password' = 'flink123','database-name' = 'mydb','table-name' = 'orders',-- 可选参数详解'server-id' = '5401','scan.incremental.snapshot.enabled' = 'true','scan.incremental.snapshot.chunk.size' = '8096','scan.startup.mode' = 'initial','heartbeat.interval' = '30s','debezium.binary.handling.mode' = 'base64'
);
2. 核心参数详解
参数名必选默认值类型说明
connectorString固定为 mysql-cdc
hostnameStringMySQL 服务器IP或域名
usernameString连接MySQL的用户名
passwordString连接MySQL的密码
database-nameString监控的数据库名,支持正则表达式(如 ^(test).* 匹配以test开头的库)
table-nameString监控的表名,支持正则表达式(如 `orders
server-id5400-6400随机StringFlink作业的唯一标识,需与其他MySQL客户端(如主从复制)不同,并行作业建议设为范围(如 5401-5404
scan.incremental.snapshot.enabledtrueBoolean启用增量快照(并行读取大表,无需全局锁),建议保持默认
scan.startup.modeinitialString启动模式:initial(快照+binlog)、earliest-offset(从最早binlog开始)、latest-offset(从最新binlog开始)
heartbeat.interval30sDuration心跳间隔,用于更新binlog位置,避免长时间无变更时binlog被清理
debezium.binary.handling.modenoneString二进制数据处理模式:base64(转Base64字符串)、hex(转十六进制),适用于BLOB/VARBINARY类型
四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2),order_status BOOLEAN
);-- 插入测试数据
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据
SELECT * FROM mysql_orders;-- 观察输出:应显示插入的两条记录
-- 后续在MySQL中更新数据,Flink会实时捕获变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample {public static void main(String[] args) throws Exception {// 创建MySQL SourceMySqlSource<String> source = MySqlSource.<String>builder().hostname("192.168.1.100").port(3306).databaseList("mydb").tableList("mydb.orders").username("flink_cdc").password("flink123").deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式.startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog).build();// 配置Flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000); // 5秒checkpointenv.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source").print(); // 打印到控制台env.execute("MySQL CDC Test");}
}
4. 验证关键点
  1. 日志检查

    • Flink 日志应包含 Binlog offset on checkpoint 字样,表明成功获取 binlog 位置
    • Access deniedPermission denied 错误,确认MySQL权限正确
  2. 数据变更测试

    • 在MySQL中执行 INSERT/UPDATE/DELETE 操作,Flink 应实时输出变更数据
    • 查看输出中的 row_kind 字段:+I(插入)、-D(删除)、+U(更新后)、-U(更新前)
  3. 增量快照验证

    • 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
    • 日志中无 FLUSH TABLES WITH READ LOCK 相关记录,确认未获取全局锁
五、常见问题与解决方案
  1. 权限不足错误

    ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
    
    • 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含 REPLICATION SLAVE 权限
  2. Server ID冲突

    ERROR: Another MySQL binlog client is using the same server id
    
    • 解决方案:修改 server-id 为唯一值,或在Flink SQL中通过 'server-id'='5401-5404' 设置范围
  3. 增量快照失败

    ERROR: Table has no primary key, cannot split snapshot chunks
    
    • 解决方案:为表添加主键,或设置 scan.incremental.snapshot.chunk.key-column 为非空列(如 'scan.incremental.snapshot.chunk.key-column'='unique_id'
  4. binlog未启用

    ERROR: Binary logging is not enabled
    
    • 解决方案:检查MySQL配置文件,确认 log-bin 已启用,重启MySQL服务

通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87552.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87552.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87552.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何查看自己电脑的CUDA版本?

在搜索栏输入命令提示符 打开 输入 nvidia-smi图片中的两个是CUDA版本和显卡的信息

opencv使用 GStreamer 硬解码和 CUDA 加速的方案

在Conda环境中从源代码编译OpenCV&#xff08;支持CUDA和GStreamer&#xff09; 以下是完整的方案步骤&#xff0c;包括必要的依赖库安装过程&#xff1a; 1. 安装Miniconda&#xff08;如果尚未安装&#xff09; # 下载Miniconda安装脚本 wget https://repo.anaconda.com/m…

Java面试宝典:多线程一

1. run() vs start() 陷阱题 下面程序的运行结果 public static void main(String[] args) {Thread t = new Thread(

【CSS-14-基础样式表Base.css】如何编写高质量的Base.css:前端样式重置与基础规范指南

在前端开发中&#xff0c;Base.css&#xff08;也称为重置样式表或基础样式表&#xff09;是整个项目样式的基石。它负责消除浏览器默认样式的差异&#xff0c;建立统一的样式基准&#xff0c;为后续开发提供一致的起点。一个精心设计的Base.css能够显著提高开发效率&#xff0…

探索Python数据科学工具链NumPyPandas与Scikit-learn

NumPy&#xff1a;数值计算的基石 NumPy是Python中用于科学计算的核心库&#xff0c;它提供了一个强大的N维数组对象&#xff0c;以及大量的数学函数库&#xff0c;能够高效地进行向量和矩阵运算。对于数据科学家而言&#xff0c;掌握NumPy是进行数据处理和算法实现的基础。 创…

八股学习(三)---MySQL

一、MySQL中的回表是什么&#xff1f;我的回答&#xff1a;MySQL回表指的是在查询使用非聚簇索引也就是二级索引时&#xff0c;叶子节点只存储了索引列的值和主键Id&#xff0c;若要查询其他字段&#xff0c;就要根据主键去聚簇索引查询完整的数据。这个过程就是回表。比如用na…

NeighborGeo:基于邻居的IP地理定位(一)

NeighborGeo:基于neighbors的IP地理定位 X. Wang, D. Zhao, X. Liu, Z. Zhang, T. Zhao, NeighborGeo: IP geolocation based on neighbors, Comput. Netw. 257 (2025) 110896, Abstract IP地址定位在网络安全、电子商务、社交媒体等领域至关重要。当前主流的图神经网络方法…

MySQL 8.0:窗口函数

一、基础知识 定义 窗口函数&#xff08;Window Function&#xff09;对查询结果集的子集&#xff08;“窗口”&#xff09;进行计算&#xff0c;保留原始行而非聚合为单行&#xff0c;适合复杂分析&#xff08;如排名、累积和&#xff09;。 基本语法&#xff1a; 函数名() OV…

AI 深度学习面试题学习

1.神经网络 1.1各个激活函数的优缺点? 1.2为什么ReLU常用于神经网络的激活函数? 1.在前向传播和反向传播过程中,ReLU相比于Sigmoid等激活函数计算量小; 2.避免梯度消失问题。对于深层网络,Sigmoid函数反向传播时,很容易就会出现梯度消失问题(在Sigmoid接近饱和区时,变换…

遇到该问题:kex_exchange_identification: read: Connection reset`的解决办法

kex_exchange_identification: read: Connection reset 是一个非常常见的 SSH 连接错误。它表明在 SSH 客户端和服务器建立安全连接的初始阶段&#xff08;密钥交换&#xff0c;Key Exchange&#xff09;&#xff0c;连接就被对方&#xff08;服务器&#xff09;强制关闭了。 …

(论文蒸馏)语言模型中的多模态思维链推理

&#xff08;论文总结&#xff09;语言模型中的多模态思维链推理 论文名称研究背景动机主要贡献研究细节两阶段框架实验结果促进收敛性摆脱人工标注错误分析与未来前景 论文名称 Multimodal Chain-of-Thought Reasoning in Language Models http://arxiv.org/abs/2302.00923 …

React Native 接入 eCharts

React Native 图表接入指南 概述 本文档详细介绍了在React Native项目中接入ECharts图表的完整步骤&#xff0c;包括依赖安装、组件配置、数据获取、图表渲染等各个环节。 目录 1. 环境准备2. 依赖安装3. 图表组件创建4. 数据获取Hook5. 图表配置6. 组件集成7. 国际化支持8…

基于C#的OPCServer应用开发,引用WtOPCSvr.dll

操作流程&#xff1a; 1.引入WtOPCSvr.dll文件 2.注册服务&#xff1a;使用UpdateRegistry方法注册&#xff0c;注意关闭应用时使用UnregisterServer取消注册。 3.初始化服务&#xff1a;使用InitWTOPCsvr初始化 4.使用CreateTag方法&#xff0c;创建标签 5.读写参数使用下面三…

Java类加载器getResource行为简单分析

今天尝试集成一个第三方SDK&#xff0c;在IDE里运行正常&#xff0c;放到服务器上却遇到了NPE&#xff0c;反编译一看&#xff0c;原来在这一行&#xff1a;String path Test.class.getClassLoader().getResource("").getPath(); // Test.class.getClassLoader().ge…

【CodeTop】每日练习 2025.7.4

Leetcode 1143. 最长公共子序列 动态规划解决&#xff0c;比较当前位置目标和实际字符串的字母&#xff0c;再根据不同情况计算接下来的情形。 class Solution {public int longestCommonSubsequence(String text1, String text2) {char[] t1 text1.toCharArray();char[] t2…

ES6从入门到精通:Promise与异步

Promise 基础概念Promise 是 JavaScript 中处理异步操作的一种对象&#xff0c;代表一个异步操作的最终完成或失败及其结果值。它有三种状态&#xff1a;Pending&#xff08;进行中&#xff09;、Fulfilled&#xff08;已成功&#xff09;、Rejected&#xff08;已失败&#xf…

数据结构:二维数组(2D Arrays)

目录 什么是二维数组&#xff1f; 二维数组的声明方式 方式 1&#xff1a;静态二维数组 方式 2&#xff1a;数组指针数组&#xff08;数组中存放的是指针&#xff09; 方式 3&#xff1a;双指针 二级堆分配 &#x1f4a1; 补充建议 如何用“第一性原理”去推导出 C 中…

HAProxy 和 Nginx的区别

HAProxy 和 Nginx 都是优秀的负载均衡工具&#xff0c;但它们在设计目标、适用场景和功能特性上有显著区别。以下是两者的详细对比&#xff1a;1. 核心定位特性HAProxyNginx主要角色专业的负载均衡器/代理Web 服务器 反向代理/负载均衡设计初衷高性能流量分发高并发 HTTP 服务…

基于Java+SpringBoot的健身房管理系统

源码编号&#xff1a;S586源码名称&#xff1a;基于SpringBoot的健身房管理系统用户类型&#xff1a;多角色&#xff0c;用户、教练、管理员数据库表数量&#xff1a;13 张表主要技术&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven运行环境&#xff1a;Windows/Mac、JD…

【MySQL安装-yum/手动安装,卸载,问题排查处理完整文档(linux)】

一.使用Yum仓库自动安装 步骤1:添加MySQL Yum仓库 sudo rpm -Uvh https://dev.mysql.com/get/mysql80-community-release-el7-6.noarch.rpm步骤2:安装MySQL服务器 sudo yum install mysql-server -y步骤3:启动并设置开机自启 sudo systemctl start mysqld sudo systemct…