一、DB2 数据库核心配置
1. 启用数据库日志记录与CDC支持
-- 以DB2管理员身份连接数据库
CONNECT TO mydb USER db2inst1 USING password;-- 启用数据库归档日志模式(CDC依赖)
UPDATE DATABASE CONFIGURATION USING LOGARCHMETH1 DISK:/db2log/archive;
QUIESCE DATABASE IMMEDIATE FORCE CONNECTIONS;
BACKUP DATABASE mydb;
UNQUIESCE DATABASE;-- 验证日志模式
GET DATABASE CONFIGURATION FOR mydb | grep LOGARCHMETH1;
-- 输出应为:LOGARCHMETH1 (Log archive method 1) = DISK:/db2log/archive-- 创建捕获模式和控制表
CREATE SCHEMA cdc;
SET SCHEMA cdc;-- 创建控制表(用于跟踪捕获进程)
CREATE TABLE cdc.control (id INTEGER PRIMARY KEY,last_commit_time TIMESTAMP
);
INSERT INTO cdc.control VALUES (1, CURRENT_TIMESTAMP);
2. 为捕获表启用变更数据捕获
-- 为目标表启用CDC(示例:products表)
SET SCHEMA myschema;-- 创建捕获缓冲区
CREATE TRIGGER products_cdc_trg
AFTER INSERT OR UPDATE OR DELETE ON products
REFERENCING NEW AS n OLD AS o
FOR EACH ROW MODE DB2SQL
BEGIN ATOMICIF INSERTING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('I', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);ELSEIF UPDATING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('U', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('U', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);ELSEIF DELETING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('D', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);END IF;
END;-- 创建捕获缓冲区表(根据实际表结构调整)
CREATE TABLE cdc.products_cdc_buffer (operation CHAR(1),op_ts TIMESTAMP,id INT,name VARCHAR(100),description VARCHAR(255),weight DECIMAL(10,3)
);
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-db2-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency><!-- DB2 JDBC驱动依赖 -->
<dependency><groupId>com.ibm.db2</groupId><artifactId>jcc</artifactId><version>11.5.0.0</version>
</dependency>
2. SQL Client部署
- 下载JAR包:
- flink-sql-connector-db2-cdc-3.0.1.jar
- db2jcc4.jar
- 将JAR包放入
$FLINK_HOME/lib/
目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';-- 创建DB2 CDC表
CREATE TABLE db2_products (id INT,name STRING,description STRING,weight DECIMAL(10, 3),-- 元数据列:捕获变更信息db_name STRING METADATA FROM 'database_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector' = 'db2-cdc','hostname' = '192.168.1.100','port' = '50000','username' = 'db2inst1','password' = 'password','database-name' = 'mydb','schema-name' = 'myschema','table-name' = 'products','server-time-zone' = 'Asia/Shanghai','scan.startup.mode' = 'initial'
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector | 是 | 无 | String | 固定为db2-cdc |
hostname | 是 | 无 | String | DB2服务器IP或域名 |
username | 是 | 无 | String | 连接数据库的用户名 |
password | 是 | 无 | String | 连接数据库的密码 |
database-name | 是 | 无 | String | 数据库名称(如mydb ) |
schema-name | 是 | 无 | String | 模式名称(如myschema ) |
table-name | 是 | 无 | String | 表名(如products ) |
port | 否 | 50000 | Integer | 数据库端口号 |
scan.startup.mode | 否 | initial | String | 启动模式:initial (首次启动时执行快照)、latest-offset (仅读取最新变更) |
server-time-zone | 否 | 系统时区 | String | 数据库服务器时区(如Asia/Shanghai ),影响TIMESTAMP转换 |
四、环境验证与测试
1. 准备测试数据(DB2)
-- 创建测试表(若不存在)
CONNECT TO mydb USER db2inst1 USING password;
SET SCHEMA myschema;CREATE TABLE products (id INT PRIMARY KEY,name VARCHAR(100),description VARCHAR(255),weight DECIMAL(10,3)
);-- 插入测试数据
INSERT INTO products VALUES (1, '产品A', '测试产品A', 1.5);
INSERT INTO products VALUES (2, '产品B', '测试产品B', 2.3);
COMMIT;
2. Flink SQL 验证
-- 查询DB2 CDC表(首次触发快照读取)
SELECT * FROM db2_products;-- 在DB2中更新数据
UPDATE myschema.products SET weight = 1.8 WHERE id = 1;
COMMIT;-- 观察Flink输出:应显示更新后的记录,op_ts为变更时间
3. DataStream API 验证
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.db2.Db2Source;public class Db2SourceExample {public static void main(String[] args) throws Exception {// 配置DB2 SourceSourceFunction<String> sourceFunction = Db2Source.<String>builder().hostname("192.168.1.100").port(50000).database("mydb").tableList("myschema.products").username("db2inst1").password("password").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.addSource(sourceFunction).print().setParallelism(1);env.execute("DB2 CDC Test");}
}
五、常见问题与解决方案
-
日志模式未启用
ERROR: DB2 CDC requires archive logging to be enabled
- 解决方案:执行
UPDATE DATABASE CONFIGURATION
启用归档日志,并重启数据库。
- 解决方案:执行
-
触发器权限不足
ERROR: User does not have permission to create triggers
- 解决方案:授予用户
CREATE TRIGGER
权限:GRANT CREATETAB, BINDADD, IMPLICIT_SCHEMA, CREATE_NOT_FENCED_ROUTINE TO db2inst1;
- 解决方案:授予用户
-
数据类型不支持(BOOLEAN)
ERROR: BOOLEAN type is not supported in SQL Replication on DB2
- 解决方案:将BOOLEAN列替换为SMALLINT(0/1)或CHAR(1)(‘Y’/‘N’)。
-
时间戳转换异常
- 解决方案:显式设置
server-time-zone
参数:'server-time-zone' = 'Asia/Shanghai'
- 解决方案:显式设置
六、生产环境优化建议
-
性能调优
- 调整
debezium.poll.interval.ms
(如500
)控制轮询间隔,debezium.snapshot.fetch.size
(如2048
)优化快照读取。
- 调整
-
高可用配置
- 使用DB2 HADR(高可用性灾难恢复)集群,Flink作业连接主节点,确保日志复制正常。
-
监控与维护
- 定期清理CDC缓冲区表:
DELETE FROM cdc.products_cdc_buffer WHERE op_ts < CURRENT_TIMESTAMP - 1 DAY;
- 定期清理CDC缓冲区表:
通过以上步骤,可完成Flink DB2 CDC的全流程配置与验证。生产环境中需特别注意DB2日志模式配置、触发器权限管理及BOOLEAN类型的兼容性问题,以确保数据一致性和系统稳定性。