Flink 与 Flink CDC 版本兼容对照表
Flink 版本 | 支持的 Flink CDC 版本 | 关键说明 |
---|---|---|
Flink 1.11.x | Flink CDC 1.2.x | 早期版本,需注意 Flink 1.11.0 的 Bug(如 Upsert 写入问题),建议使用 1.11.1 及以上。 |
Flink 1.12.x | Flink CDC 2.0.x(MySQL 使用 flink-connector-mysql-cdc ) | Flink 1.12.x 支持 CDC 2.0.x,MySQL 使用新版 Connector。 |
Flink 1.13.x | Flink CDC 2.2.x, 2.3.x, 2.4.x | 2.2.x 起支持 Flink 1.13.x,2.4.x 兼容性更广(支持到 Flink 1.15.x)。 |
Flink 1.14.x | Flink CDC 2.2.x, 2.3.x, 2.4.x | 同 Flink 1.13.x,需注意 2.4.x 对 1.14.x 的支持。 |
Flink 1.15.x | Flink CDC 2.3.x, 2.4.x | 2.4.x 是 Flink 1.15.x 的推荐版本,支持增量快照框架。 |
Flink 1.16.x | Flink CDC 2.3.x, 2.4.x | 2.4.x 支持 Flink 1.16.x,但需注意部分功能可能受限。 |
Flink 1.17.x | Flink CDC 2.5.x 及以上(如 2.5.0) | 官方未声明 2.4.x 支持 Flink 1.17.x,需升级 Flink CDC 至 2.5+ 或降级 Flink 至 1.15.x。 |
Flink 2.0.x | 未明确说明(需参考最新 Flink CDC 文档) | Flink 2.0 为新版本,建议关注 Flink CDC 官方文档的最新支持情况。 |
Flink CDC 3.x | 仅支持 Flink 1.13.x 及以上(具体版本需看文档) | Flink CDC 3.x 是新一代数据集成框架,需与 Flink 1.13+ 配合使用。 |
1.flink cdc 的两种使用方式
source:type: mysql-cdchostname: localhostport: 3306username: rootpassword: "123456"database-list: app_dbtable-list: app_db.*scan.startup.mode: initialscan.incremental.snapshot.enabled: truescan.newly-added-table.enabled: truesink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL to Dorisparallelism: 2execution.runtime-mode: STREAMING
./bin/flink-cdc.sh run mysql-to-doris.yaml
2. flink cdc 另一种使用方式
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency>
package com.example.demo.cdc;import com.example.demo.ConnectionConstants;
import com.example.demo.deserial.SafeStringKafkaDeserializationSchema;
import com.example.demo.domain.TableData;
import com.example.demo.dynamic.ExtractKafaRowAndTableName;
import com.example.demo.sink.DynamicJdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;public class FlinkKafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "192.168.64.141:9092");SafeStringKafkaDeserializationSchema schema = new SafeStringKafkaDeserializationSchema();//CustomKafkaDeserializationSchema schema = new CustomKafkaDeserializationSchema();FlinkKafkaConsumer<ConsumerRecord<String, String>> kafkaSource = new FlinkKafkaConsumer<>("part.t_part", // 匹配所有 testdb 下的表schema,kafkaProps);kafkaSource.setStartFromEarliest();DataStreamSource<ConsumerRecord<String, String>> ds = env.addSource(kafkaSource);ExtractKafaRowAndTableName extractRowAndTableName = new ExtractKafaRowAndTableName();SingleOutputStreamOperator<TableData> mapStream = ds.map(extractRowAndTableName);JdbcExecutionOptions options = JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(2000).withMaxRetries(2).build();DynamicJdbcSink dynamicJdbcSink = new DynamicJdbcSink(ConnectionConstants.PG_DRIVER_CLSSNAME,ConnectionConstants.PG_URL,ConnectionConstants.PG_USER_NAME,ConnectionConstants.PG_PASSWORD);mapStream.addSink(dynamicJdbcSink);env.enableCheckpointing(5000); // 每 5 秒做一次 checkpointkafkaSource.setCommitOffsetsOnCheckpoints(true);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 最小间隔env.getCheckpointConfig().setCheckpointTimeout(6000); // 超时时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 并行数env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.execute("Multi-table CDC to PostgreSQL via DataStream");}
}
使用flink cdc 写代码的时候jar 包方式,你需要考虑不同数据库的序列化和反序列化问题, yaml 方式就是没提供的功能你无法用不够灵活。
一、判断Flink CDC同步完成的常见方法
Flink CDC的同步分为全量同步和增量同步阶段,完成标志如下:
监控
currentEmitEventTimeLag
指标:- 这是核心判断依据。该指标表示数据从数据库产生到离开Source节点的时间延迟。
- 全量同步完成标志:当
currentEmitEventTimeLag
从 ≤0 变为 >0 时,表示已从全量阶段进入增量(Binlog)读取阶段。 - 原理:全量阶段该指标为0(无延迟),进入增量阶段后延迟值变为正数。
- 实现:通过Flink的Metrics系统(如Prometheus、Grafana)实时监控该指标。
检查日志输出:
- 在日志中搜索关键词
BinlogSplitReader is created
或全量同步结束
,这通常标志全量阶段完成。 - 全量同步完成后,日志会显示Binlog读取开始。
- 在日志中搜索关键词
观察作业状态和指标:
- 作业状态:通过Flink Web UI或API检查Job状态,若为
FINISHED
(仅限批处理任务),表示同步完成。 - 其他指标:
sourceIdleTime
:源空闲时间增加,可能表示无新数据。currentFetchEventTimeLag
:类似currentEmitEventTimeLag
,监控数据读取延迟。
- 作业状态:通过Flink Web UI或API检查Job状态,若为
验证目标数据:
- 对比源数据库和目标存储(如数据湖)的数据一致性:
- 全量同步后,目标数据应包含源数据库的所有记录。
- 使用数据校验工具(如比对哈希值)确保一致性。
- 对比源数据库和目标存储(如数据湖)的数据一致性:
二、为什么不用数据条数判断?
- 动态性:增量同步中数据持续流入,条数无法作为静态终点。
- 准确性问题:
- 数据删除、更新可能导致条数波动。
- 分布式系统中,分片同步可能不同步完成。
- 替代方案:上述指标和状态监控更实时可靠。
三、实践建议
- 实时监控:优先使用
currentEmitEventTimeLag
,结合Prometheus等工具告警。 - 自动化验证:在ETL管道中加入数据校验步骤,确保同步质量。
- 日志审计:定期审查日志,辅助异常排查。
如果您有具体同步场景(如MySQL到数据湖),可进一步优化方案。
Flink中的滑动窗口和滚动窗口
1. 滑动窗口(Sliding Window):
- 定义:滑动窗口有一个固定的大小,并且可以有重叠。这意味着数据项可能会被包含在一个或多个窗口中。
- 用途:适用于需要分析一段时间内的趋势或模式的情况,例如计算过去5分钟内每1分钟的数据平均值。
- 特点:
- 窗口大小和滑动步长可以独立配置。
- 可能导致较高的计算成本,因为它涉及到更多的窗口操作。
2. 滚动窗口(Tumbling Window):
- 定义:滚动窗口是滑动窗口的一种特殊情况,其中窗口之间没有重叠(即滑动步长等于窗口大小)。每个数据项只会属于一个特定的窗口。
- 用途:适合于定期汇总数据的场景,比如每天统计一次用户活动量。
- 特点:
- 简单易懂,实现起来相对直接。
- 数据不会跨窗口重复处理,减少了计算负担。
限流熔断机制中的滑动窗口
3. 限流熔断机制中的滑动窗口:
- 定义:在分布式系统或微服务架构中,为了防止某个服务过载而采取的一种保护措施。这里的滑动窗口通常用于监控请求速率,以便决定是否应该限制请求或触发熔断。
- 用途:主要用于控制流量、保护下游服务免受突发流量的影响。
- 特点:
- 主要关注点在于时间间隔内的请求数量或错误率。
- 实现方式可能包括固定大小的时间桶(buckets),随着时间推移,新的请求会进入最新的时间桶,而旧的时间桶会被丢弃。
- 目的是快速响应流量变化,提供即时反馈以调整系统的负载能力。
区别总结
- 应用场景不同:Flink的窗口函数主要用于流处理任务中的数据分析;而限流熔断机制中的滑动窗口则用于保障系统稳定性和可用性。
- 技术细节差异:Flink中的窗口涉及复杂的数据聚合逻辑,可能跨越多个节点进行分布式计算;相比之下,限流熔断机制中的滑动窗口更注重实时性和效率,通常在单个服务实例内部执行。
- 目标不同:前者旨在提取有价值的信息,如统计信息、模式识别等;后者的目标是通过限制请求频率来维持系统的健康状态。