我遇到的问题现象是这样的



1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。




flink run -d -s 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 
-t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx 
/tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod




2、flink-conf.xml




state.checkpoints.dir: hdfs:///user/flink/checkpoints/default




3、代码checkpoint设置




   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();




       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10));




       CheckpointConfig checkpointConfig = env.getCheckpointConfig();




       
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




       env.enableCheckpointing(1 * 60 * 1000);




       checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);




       checkpointConfig.setTolerableCheckpointFailureNumber(100);




       checkpointConfig.setCheckpointTimeout(60 * 1000);




       checkpointConfig.setMaxConcurrentCheckpoints(1);




4、问题现象




a)运维同事切换yarn 
resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器




      b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200




c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 
restore的。




d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> 
sourceMilApplysLogStream = MySQLSource.<String>builder()




  重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费




e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?




2021-05-24 16:49:50,398 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.savepoint.path, 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100



预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费




现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。

回复