I have following piece of configuration in flink.yaml: Key Value high-availability zookeeper high-availability.storageDir file:///home/flink/flink-ha-data high-availability.zookeeper.quorum localhost:2181 state.backend rocksdb state.backend.incremental true state.checkpoints.dir file:///home/flink/checkpoints
And in my code (Main.class): StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStateBackend(new RocksDBStateBackend("file:///home/flink/checkpoint-data", true)); env.enableCheckpointing(Duration.ofMinutes(5).toMillis()); Also the next class should to save data in store, when event is received: public class StateManager extends KeyedProcessFunction<String, String, String> { private ValueState<String> events; @Override public void processElement(String s, Context context, Collector<String> collector) throws Exception { System.out.println("events: " + events.value()); // Check last value for this key Model model = new Gson().fromJson(s, Model.class); events.update(model.toString()); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("state", Types.STRING); events = getRuntimeContext().getState(stateDescriptor); System.out.println("In open"); } } But when I stop a job and start it again no saving data I see. I check it with printing data to sysout. There is null value after restarting job. But why do I get this behavior? Maybe my settings is not proper? Thanks, Yuri L. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/