hi
????UI??checkpoint??????????????????????????????checkpoint?????? 
??????????????????


| |
JasonLee
|
|
[email protected]
|

Signature is customized by Netease Mail Master

??2020??07??17?? 17:21??sun ??????
??????counts ?????? ???????????????????? List<String&gt; list = 
Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           }????????????????????????????????????????????????????
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222&gt; {


   private transient ListState<String&gt; counts;


   @Override
   public void open(Configuration parameters) throws Exception {
       StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.minutes(30))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

       ListStateDescriptor<String&gt; lastUserLogin = new 
ListStateDescriptor<&gt;("lastUserLogin", String.class);
       lastUserLogin.enableTimeToLive(ttlConfig);
       counts = getRuntimeContext().getListState(lastUserLogin);
   }


   @Override
   public void flatMap(String s, Collector<Test222&gt; collector) throws 
Exception {
           Test222 message = JSONUtil.toObject(s, new 
TypeReference<Test222&gt;() {
           });

           System.out.println(DateUtil.toLongDateString(new Date()));
           log.info(DateUtil.toLongDateString(new Date()));
           counts.add(message.getId());
           List<String&gt; list = Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           }
             log.info(DateUtil.toLongDateString(new Date()));
           System.out.println(DateUtil.toLongDateString(new Date()));
   }
}










------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??16??(??????) ????8:16
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: state??????checkpoint??????



Hi

1 counts ??????????????????????????????????????????????????????????
2 ???????????? counts ??????????????????
3. ?????????????? checkpoint ???????????????????? JM log ??????
4. ?????????????????????????????????????????? state-process-api[1] 
?????????????????????????????? restore ??????????

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <[email protected]&gt; ??2020??7??16?????? ????6:16??????

&gt;
&gt; 
????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; //????????????????
&gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt;
&gt; 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;
&gt; 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; env.setStateBackend(new
&gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt;&nbsp;&nbsp; ??????????????private transient ListState<String&amp;gt; 
counts;
&gt;
&gt;
&gt; @Override
&gt; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = 
new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = 
getRuntimeContext().getListState(lastUserLogin);
&gt; }
&gt; ????????task managers ????????&nbsp; counts&nbsp; ??????????????????

回复