那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
董建 <[email protected]> 于2021年5月28日周五 下午6:24写道: > 稳定复现 > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 > 我们jobmanager没有做ha,不知道是否是这个原因导致的? > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 > >> org.apache.flink.configuration.GlobalConfiguration [] - > Loading > >> configuration property: execution.savepoint.path, > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > > > > > > > > > > > > > > > 在 2021-05-28 18:15:38,"刘建刚" <[email protected]> 写道: > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? > >1、从savepoint恢复; > >2、作业开始定期做savepoint; > >3、作业failover。 > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 > >如果还是有问题,需要通过日志来排查了。 > > > >董建 <[email protected]> 于2021年5月28日周五 下午5:37写道: > > > >> 我遇到的问题现象是这样的 > >> > >> > >> > >> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 > >> > >> > >> > >> > >> flink run -d -s > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx > >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod > >> > >> > >> > >> > >> 2、flink-conf.xml > >> > >> > >> > >> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default > >> > >> > >> > >> > >> 3、代码checkpoint设置 > >> > >> > >> > >> > >> StreamExecutionEnvironment env = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> > >> > >> > >> > >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, > >> 10)); > >> > >> > >> > >> > >> CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > >> > >> > >> > >> > >> > >> > checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > >> > >> > >> > >> env.enableCheckpointing(1 * 60 * 1000); > >> > >> > >> > >> > >> > >> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> > >> > >> > >> > >> checkpointConfig.setTolerableCheckpointFailureNumber(100); > >> > >> > >> > >> > >> checkpointConfig.setCheckpointTimeout(60 * 1000); > >> > >> > >> > >> > >> checkpointConfig.setMaxConcurrentCheckpoints(1); > >> > >> > >> > >> > >> 4、问题现象 > >> > >> > >> > >> > >> a)运维同事切换yarn > >> > resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 > >> > >> > >> > >> > >> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 > >> > >> > >> > >> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 > >> restore,从日志中看还是从chk-100 restore的。 > >> > >> > >> > >> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction<String> > >> sourceMilApplysLogStream = MySQLSource.<String>builder() > >> > >> > >> > >> > >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 > >> > >> > >> > >> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? > >> > >> > >> > >> > >> 2021-05-24 16:49:50,398 INFO > >> org.apache.flink.configuration.GlobalConfiguration [] - > Loading > >> configuration property: execution.savepoint.path, > >> > hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 > >> > >> > >> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 > >> > >> > >> > >> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。 >
