一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)
MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnf
或 my.ini
)中添加以下配置:
# 启用二进制日志
log-bin=mysql-bin
# 二进制日志格式(推荐ROW模式,记录行级变更)
binlog-format=ROW
# 启用GTID(高可用必备)
gtid-mode=ON
enforce-gtid-consistency=ON
# 从库同步时记录binlog(主从架构需要)
log-slave-updates=ON
# 避免长连接超时(大表快照时需要)
interactive_timeout=3600
wait_timeout=3600
配置说明:
log-bin
:指定二进制日志文件名前缀,MySQL 会自动生成如mysql-bin.000001
的文件binlog-format=ROW
:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更gtid-mode
:全局事务标识符,用于主从切换时保证数据一致性log-slave-updates
:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';-- 刷新权限
FLUSH PRIVILEGES;
权限说明:
SELECT
:读取表数据(快照阶段需要)SHOW DATABASES
:获取数据库列表(用于正则匹配监控库)REPLICATION SLAVE
:读取 binlog 必备权限REPLICATION CLIENT
:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID
每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):
# 在my.cnf中添加
server-id=1001 # 任意唯一整数,建议范围5400-6400
说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N
),例如:
-- Flink SQL 中通过Hints设置Server ID范围
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)
在 pom.xml
中添加 MySQL CDC 连接器依赖:
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version><!-- 若使用Flink 1.14+,无需添加scope --><scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven环境)
- 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
- 将 JAR 包放入
$FLINK_HOME/lib/
目录 - 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)
SET 'execution.checkpointing.interval' = '3s';-- 创建MySQL CDC表
CREATE TABLE mysql_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,-- 可选:添加元数据列db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,row_kind STRING METADATA FROM 'row_kind' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'flink_cdc','password' = 'flink123','database-name' = 'mydb','table-name' = 'orders',-- 可选参数详解'server-id' = '5401','scan.incremental.snapshot.enabled' = 'true','scan.incremental.snapshot.chunk.size' = '8096','scan.startup.mode' = 'initial','heartbeat.interval' = '30s','debezium.binary.handling.mode' = 'base64'
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector | 是 | 无 | String | 固定为 mysql-cdc |
hostname | 是 | 无 | String | MySQL 服务器IP或域名 |
username | 是 | 无 | String | 连接MySQL的用户名 |
password | 是 | 无 | String | 连接MySQL的密码 |
database-name | 是 | 无 | String | 监控的数据库名,支持正则表达式(如 ^(test).* 匹配以test开头的库) |
table-name | 是 | 无 | String | 监控的表名,支持正则表达式(如 `orders |
server-id | 否 | 5400-6400随机 | String | Flink作业的唯一标识,需与其他MySQL客户端(如主从复制)不同,并行作业建议设为范围(如 5401-5404 ) |
scan.incremental.snapshot.enabled | 否 | true | Boolean | 启用增量快照(并行读取大表,无需全局锁),建议保持默认 |
scan.startup.mode | 否 | initial | String | 启动模式:initial (快照+binlog)、earliest-offset (从最早binlog开始)、latest-offset (从最新binlog开始) |
heartbeat.interval | 否 | 30s | Duration | 心跳间隔,用于更新binlog位置,避免长时间无变更时binlog被清理 |
debezium.binary.handling.mode | 否 | none | String | 二进制数据处理模式:base64 (转Base64字符串)、hex (转十六进制),适用于BLOB/VARBINARY类型 |
四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2),order_status BOOLEAN
);-- 插入测试数据
INSERT INTO orders VALUES
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据
SELECT * FROM mysql_orders;-- 观察输出:应显示插入的两条记录
-- 后续在MySQL中更新数据,Flink会实时捕获变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample {public static void main(String[] args) throws Exception {// 创建MySQL SourceMySqlSource<String> source = MySqlSource.<String>builder().hostname("192.168.1.100").port(3306).databaseList("mydb").tableList("mydb.orders").username("flink_cdc").password("flink123").deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式.startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog).build();// 配置Flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000); // 5秒checkpointenv.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source").print(); // 打印到控制台env.execute("MySQL CDC Test");}
}
4. 验证关键点
-
日志检查:
- Flink 日志应包含
Binlog offset on checkpoint
字样,表明成功获取 binlog 位置 - 无
Access denied
或Permission denied
错误,确认MySQL权限正确
- Flink 日志应包含
-
数据变更测试:
- 在MySQL中执行
INSERT/UPDATE/DELETE
操作,Flink 应实时输出变更数据 - 查看输出中的
row_kind
字段:+I
(插入)、-D
(删除)、+U
(更新后)、-U
(更新前)
- 在MySQL中执行
-
增量快照验证:
- 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
- 日志中无
FLUSH TABLES WITH READ LOCK
相关记录,确认未获取全局锁
五、常见问题与解决方案
-
权限不足错误:
ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
REPLICATION SLAVE
权限
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
-
Server ID冲突:
ERROR: Another MySQL binlog client is using the same server id
- 解决方案:修改
server-id
为唯一值,或在Flink SQL中通过'server-id'='5401-5404'
设置范围
- 解决方案:修改
-
增量快照失败:
ERROR: Table has no primary key, cannot split snapshot chunks
- 解决方案:为表添加主键,或设置
scan.incremental.snapshot.chunk.key-column
为非空列(如'scan.incremental.snapshot.chunk.key-column'='unique_id'
)
- 解决方案:为表添加主键,或设置
-
binlog未启用:
ERROR: Binary logging is not enabled
- 解决方案:检查MySQL配置文件,确认
log-bin
已启用,重启MySQL服务
- 解决方案:检查MySQL配置文件,确认
通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。