????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//????????????????
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new 
RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));            
??????????????private transient ListState<String&gt; counts;


@Override
public void open(Configuration parameters) throws Exception {
    StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.minutes(30))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();

    ListStateDescriptor<String&gt; lastUserLogin = new 
ListStateDescriptor<&gt;("lastUserLogin", String.class);
    lastUserLogin.enableTimeToLive(ttlConfig);
    counts = getRuntimeContext().getListState(lastUserLogin);
}
????????task managers ????????  counts  ??????????????????

回复