Hi

1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了
2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key
上的(keyby 的 key)

Best,
Congxian


sun <[email protected]> 于2020年7月17日周五 下午5:22写道:

> 你好:counts 的数据 我是在下面打印出来了 List<String&gt; list =
> Lists.newArrayList(counts.get()) ;
>             for(String ss : list){
>                 System.out.println("!!!" + ss);
>                 log.info("!!!" + ss);
>             },但是我重启服务之后,之前存的那些内容打印不出来了。
> @Slf4j
> public class FlatMapTestState extends RichFlatMapFunction<String,
> Test222&gt; {
>
>
>     private transient ListState<String&gt; counts;
>
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         StateTtlConfig ttlConfig = StateTtlConfig
>                 .newBuilder(Time.minutes(30))
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         ListStateDescriptor<String&gt; lastUserLogin = new
> ListStateDescriptor<&gt;("lastUserLogin", String.class);
>         lastUserLogin.enableTimeToLive(ttlConfig);
>         counts = getRuntimeContext().getListState(lastUserLogin);
>     }
>
>
>     @Override
>     public void flatMap(String s, Collector<Test222&gt; collector) throws
> Exception {
>             Test222 message = JSONUtil.toObject(s, new
> TypeReference<Test222&gt;() {
>             });
>
>             System.out.println(DateUtil.toLongDateString(new Date()));
>             log.info(DateUtil.toLongDateString(new Date()));
>             counts.add(message.getId());
>             List<String&gt; list = Lists.newArrayList(counts.get()) ;
>             for(String ss : list){
>                 System.out.println("!!!" + ss);
>                 log.info("!!!" + ss);
>             }
>               log.info(DateUtil.toLongDateString(new Date()));
>             System.out.println(DateUtil.toLongDateString(new Date()));
>     }
> }
>
>
>
>
>
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [email protected]&gt;;
> 发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
>
> 主题:&nbsp;Re: state无法从checkpoint中恢复
>
>
>
> Hi
>
> 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
> 2 能否把你关于 counts 的其他代码也贴一下
> 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
> 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
> Best,
> Congxian
>
>
> sun <[email protected]&gt; 于2020年7月16日周四 下午6:16写道:
>
> &gt;
> &gt;
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> &gt; //作业失败后不重启
> &gt; env.setRestartStrategy(RestartStrategies.noRestart());
> &gt; env.getCheckpointConfig().setCheckpointTimeout(500);
> &gt;
> &gt;
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> &gt;
> &gt;
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> &gt; env.setStateBackend(new
> &gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
> &gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts;
> &gt;
> &gt;
> &gt; @Override
> &gt; public void open(Configuration parameters) throws Exception {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .newBuilder(Time.minutes(30))
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> &gt;
> &gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .build();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt;
> lastUserLogin = new
> &gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; counts =
> getRuntimeContext().getListState(lastUserLogin);
> &gt; }
> &gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了

回复