Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <[email protected]> 于2020年7月16日周四 下午6:16写道:

>
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //作业失败后不重启
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.getCheckpointConfig().setCheckpointTimeout(500);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new
> RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
>   使用状态的代码private transient ListState<String&gt; counts;
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
>     StateTtlConfig ttlConfig = StateTtlConfig
>             .newBuilder(Time.minutes(30))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build();
>
>     ListStateDescriptor<String&gt; lastUserLogin = new
> ListStateDescriptor<&gt;("lastUserLogin", String.class);
>     lastUserLogin.enableTimeToLive(ttlConfig);
>     counts = getRuntimeContext().getListState(lastUserLogin);
> }
> 我重启了task managers 后。发现  counts  里面的数据都丢失了

回复