Hi 1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了 2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key 上的(keyby 的 key)
Best, Congxian sun <[email protected]> 于2020年7月17日周五 下午5:22写道: > 你好:counts 的数据 我是在下面打印出来了 List<String> list = > Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > },但是我重启服务之后,之前存的那些内容打印不出来了。 > @Slf4j > public class FlatMapTestState extends RichFlatMapFunction<String, > Test222> { > > > private transient ListState<String> counts; > > > @Override > public void open(Configuration parameters) throws Exception { > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > ListStateDescriptor<String> lastUserLogin = new > ListStateDescriptor<>("lastUserLogin", String.class); > lastUserLogin.enableTimeToLive(ttlConfig); > counts = getRuntimeContext().getListState(lastUserLogin); > } > > > @Override > public void flatMap(String s, Collector<Test222> collector) throws > Exception { > Test222 message = JSONUtil.toObject(s, new > TypeReference<Test222>() { > }); > > System.out.println(DateUtil.toLongDateString(new Date())); > log.info(DateUtil.toLongDateString(new Date())); > counts.add(message.getId()); > List<String> list = Lists.newArrayList(counts.get()) ; > for(String ss : list){ > System.out.println("!!!" + ss); > log.info("!!!" + ss); > } > log.info(DateUtil.toLongDateString(new Date())); > System.out.println(DateUtil.toLongDateString(new Date())); > } > } > > > > > > > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [email protected]>; > 发送时间: 2020年7月16日(星期四) 晚上8:16 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: state无法从checkpoint中恢复 > > > > Hi > > 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象 > 2 能否把你关于 counts 的其他代码也贴一下 > 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看 > 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html > Best, > Congxian > > > sun <[email protected]> 于2020年7月16日周四 下午6:16写道: > > > > > > 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > //作业失败后不重启 > > env.setRestartStrategy(RestartStrategies.noRestart()); > > env.getCheckpointConfig().setCheckpointTimeout(500); > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.setStateBackend(new > > RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); > > 使用状态的代码private transient ListState<String&gt; counts; > > > > > > @Override > > public void open(Configuration parameters) throws Exception { > > StateTtlConfig ttlConfig = StateTtlConfig > > > .newBuilder(Time.minutes(30)) > > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > > .build(); > > > > ListStateDescriptor<String&gt; > lastUserLogin = new > > ListStateDescriptor<&gt;("lastUserLogin", String.class); > > lastUserLogin.enableTimeToLive(ttlConfig); > > counts = > getRuntimeContext().getListState(lastUserLogin); > > } > > 我重启了task managers 后。发现 counts 里面的数据都丢失了
