??????counts ?????? ???????????????????? List<String> list =
Lists.newArrayList(counts.get()) ;
for(String ss : list){
System.out.println("!!!" + ss);
log.info("!!!" + ss);
}????????????????????????????????????????????????????
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222> {
private transient ListState<String> counts;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<String> lastUserLogin = new
ListStateDescriptor<>("lastUserLogin", String.class);
lastUserLogin.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getListState(lastUserLogin);
}
@Override
public void flatMap(String s, Collector<Test222> collector) throws
Exception {
Test222 message = JSONUtil.toObject(s, new
TypeReference<Test222>() {
});
System.out.println(DateUtil.toLongDateString(new Date()));
log.info(DateUtil.toLongDateString(new Date()));
counts.add(message.getId());
List<String> list = Lists.newArrayList(counts.get()) ;
for(String ss : list){
System.out.println("!!!" + ss);
log.info("!!!" + ss);
}
log.info(DateUtil.toLongDateString(new Date()));
System.out.println(DateUtil.toLongDateString(new Date()));
}
}
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??7??16??(??????) ????8:16
??????: "user-zh"<[email protected]>;
????: Re: state??????checkpoint??????
Hi
1 counts ??????????????????????????????????????????????????????????
2 ???????????? counts ??????????????????
3. ?????????????? checkpoint ???????????????????? JM log ??????
4. ?????????????????????????????????????????? state-process-api[1]
?????????????????????????????? restore ??????????
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian
sun <[email protected]> ??2020??7??16?????? ????6:16??????
>
>
????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //????????????????
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.getCheckpointConfig().setCheckpointTimeout(500);
>
>
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new
> RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
> ??????????????private transient ListState<String&gt;
counts;
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
> StateTtlConfig ttlConfig = StateTtlConfig
>
.newBuilder(Time.minutes(30))
>
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>
.build();
>
> ListStateDescriptor<String&gt; lastUserLogin =
new
> ListStateDescriptor<&gt;("lastUserLogin", String.class);
> lastUserLogin.enableTimeToLive(ttlConfig);
> counts =
getRuntimeContext().getListState(lastUserLogin);
> }
> ????????task managers ???????? counts ??????????????????