Hi 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 2 能否把你关于 counts 的其他代码也贴一下 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html Best, Congxian sun <[email protected]> 于2020年7月16日周四 下午6:16写道: > > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //作业失败后不重启 > env.setRestartStrategy(RestartStrategies.noRestart()); > env.getCheckpointConfig().setCheckpointTimeout(500); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setStateBackend(new > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > 使用状态的代码private transient ListState<String> counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String> lastUserLogin = new > ListStateDescriptor<>("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > 我重启了task managers 后。发现 counts 里面的数据都丢失了
