????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??9??3??(??????) ????4:10
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re:??????checkpoint??????state















???????????????? ????????checkpoint????????????????????????????checkpint???? 





?? 2020-09-03 16:03:41??"sun" <[email protected]&gt; ??????
&gt;??????????2??????
&gt;
&gt;1????????????????checkpoint????????chk-&amp;nbsp; ??????chk-1??????chk-2 
........,????????????????????
&gt;
&gt;2????????????????????checkpoint??????state??????
&gt;
&gt;??????????????????????????????????????
&gt;
&gt;
&gt;
&gt;final StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
&gt;
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
StateBackend stateBackend = new 
RocksDBStateBackend("hdfs://10.100.51.101:9000/flink/checkpoints",true);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
StateBackend stateBackend = new FsStateBackend("file:///flink/checkpoints");
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
StateBackend stateBackend = new MemoryStateBackend();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.setStateBackend(stateBackend);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.enableCheckpointing(1000);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
streamExecutionEnvironment.getCheckpointConfig()
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

回复