??????????2??????
1????????????????checkpoint????????chk- ??????chk-1??????chk-2
........,????????????????????
2????????????????????checkpoint??????state??????
??????????????????????????????????????
final StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// StateBackend stateBackend = new
RocksDBStateBackend("hdfs://10.100.51.101:9000/flink/checkpoints",true);
StateBackend stateBackend = new
FsStateBackend("file:///flink/checkpoints");
// StateBackend stateBackend = new MemoryStateBackend();
streamExecutionEnvironment.setStateBackend(stateBackend);
streamExecutionEnvironment.enableCheckpointing(1000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
streamExecutionEnvironment.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);