????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//????????????????
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new
RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
??????????????private transient ListState<String> counts;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<String> lastUserLogin = new
ListStateDescriptor<>("lastUserLogin", String.class);
lastUserLogin.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getListState(lastUserLogin);
}
????????task managers ???????? counts ??????????????????