背景
最近在做 Flink 任务数据源切换时遇到 offset 消费问题,遂写篇文章记录下来。
切换时只修改了 source 算子的 topic,uid 等其他信息保持不变:
- 发布时,发现算子的消费者点位重置为earliest,导致消息积压。
- 消息积压后,打算通过时间戳重置点位到发布前,但是发现点位重置失效。
原因分析
source算子点位初始化模式
source算子点位初始化有两种方式:1)消费者组偏移量:setStartFromGroupOffsets;2)时间戳:setStartFromTimestamp。
消费组偏移量(FromGroupOffsets)
该方式会将 startupMode 初始化为 StartupMode.GROUP_OFFSETS:
startupMode枚举:
时间戳(FromTimestamp)
该方式会将 startupMode 初始化为 StartupMode.TIMESTAMP:
source 算子初始化
示例代码:
public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");configuration.setString("execution.savepoint.path", "xxx");configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 启用checkpointenv.enableCheckpointing(5000);env.setParallelism(1);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);ParameterTool argTools = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(argTools);// 添加数据源// "old_topic", "new_topic"FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();consumer.setStartFromGroupOffsets();DataStream<String> stream = env.addSource(consumer).uid("kafka-source").name("kafka-source");SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {@Overridepublic HeartEntity map(String value) throws Exception {HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);return heartEntity;}}).uid("map-heart").name("map-heart");// 使用状态计数DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo).map(new RichMapFunction<HeartEntity, Long>() {private transient ValueState<Long> countState;private long count = 0;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(HeartEntity value) throws Exception {count++;countState.update(count);return countState.value();}}).uid("count-map").name("count-map");// 打印计数结果countStream.print().uid("print").name("print");// 启动Flink任务env.execute("Flink Kafka State Count Example");}
从状态启动
initializeState
状态初始化时,FlinkKafkaConsumerBase 执行 initializeState 方法中:当 source topic 从 sg_lock_heart_msg_topic 切换为 sg_tw_common_com_lock_heart_report_topic 时,可以看到新 topic 绑定的 source 算子仍然是从老 topic 的算子状态启动的,因为 uid 没变。
initializeState 往下走可以看到,restoreState 的是老 topic 分区的状态;
open
算子初始化时,如果状态不为空且 topic 分区不在状态中,那么就会把新的 topic 分区加入到状态中,并设置算子消费新分区的 startupMode 为 EARLIEST_OFFSET,即从最早的消息开始消费。
老的 topic 分区不会再消费,会被移除订阅。
订阅的 topic 分区
从指定时间戳启动
setStartFromTimestamp 设置启动模式为时间戳
然而在算子初始化时,由于从状态启动,新 topic分区 仍然会从 earliest 消费:
也就是说,checkpoint/savepoint 中存储的 source 点位状态在恢复时大于设置的时间戳。
解决方案
尝试一(修改 uid)
从 source 算子初始化的 open 过程可知,既然从状态启动时会将已存在 source 算子(uid在状态中)的新 topic 点位设置为最早,那么如果将新 topic 的 uid 改成与老 topic 的 uid 不一致,是否就能避免从 earliest 恢复:因为从状态恢复时新的 uid 并不在状态中,那么就不会走 open 中将新 topic 点位置为 earliest 的流程。
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
可以看到在状态初始化阶段(initializeState),source 算子的状态 (restoreState)被置为空集合,而不是 null。为什么?
当在算子初始化时,因为 restoreState 不为 null,仍然会进入点位重置的流程:
可以看到这里将新 topic 分区放入了 restoreState 中,且点位置为 earliest(StarupMode 枚举中,EARLIEST_OFFSET = -915623761775L)。
再往下走,restoreState 会将其中的新 topic 分区放入订阅的分区中
从此,新 topic 又从最早开始消费😓。那么方案尝试一是失败的!
在线上实际操作时,消费点位确实被重置到了 earliest,又导致积压了😦。
尝试二(修改消费者组)
有没有办法让 restoreState 置为 null 呢,那就真的不会走到点位重置的流程了🎊
突然看到 restoreState 的注释:
如果消费者从状态恢复,就会设置 restoreState。那怎么让消费者不从状态恢复?无状态启动肯定是不行的,不能让其他算子的状态丢了。那我直接换个消费组名!试一试呢
Properties props = new Properties();props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");props.put("group.id", "flink-label-engine-new");
还是不行,直到目前发现只要从状态启动,context 上下文会让代码走进给 restoreState 赋值的位置。
isRestored分析
isRestore分析
尝试三(新增拓扑图)
根据算子状态恢复可知,只要新增的 source 算子跟其他已有算子形成了算子链,如果以状态启动,那么 source 的点位就会被置为 earliest。
- 新增一个新 topic 的 source 算子和 sink 算子(要保证新增的算子与已有算子隔离,不会形成算子链),然后修改老 source 算子的 uid 和 topic 与新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new).uid("kafka-source-new").name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");
由于从状态启动,且新加入的算子与其他算子隔离,老 source 算子的点位从状态启动;新 source 算子的点位被置为 GROUP_OFFSET。
1. 暂停并保存状态;
2. 修改老 source 算子的 uid 和 topic 与 新算子保持一致,同时删除新算子;
3. 然后从状态启动(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子点位从状态恢复:
4. 下游 “count-map”的状态是否正常:发送测试消息,可以看出状态没丢失