I have following piece of configuration in flink.yaml:

Key                                                             Value
high-availability                                               zookeeper
high-availability.storageDir                            
file:///home/flink/flink-ha-data
high-availability.zookeeper.quorum              localhost:2181
state.backend                                           rocksdb
state.backend.incremental                               true
state.checkpoints.dir                                   
file:///home/flink/checkpoints

And in my code (Main.class):

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new
RocksDBStateBackend("file:///home/flink/checkpoint-data", true));
env.enableCheckpointing(Duration.ofMinutes(5).toMillis());

Also the next class should to save data in store, when event is received:

public class StateManager extends KeyedProcessFunction<String, String,
String> {

    private ValueState<String> events;


    @Override
    public void processElement(String s, Context context, Collector<String>
collector) throws Exception {
        
        System.out.println("events: " + events.value()); // Check last value
for this key

        Model model = new Gson().fromJson(s, Model.class);
        events.update(model.toString());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<String> stateDescriptor = new
ValueStateDescriptor<>("state", Types.STRING);
        events = getRuntimeContext().getState(stateDescriptor);
        System.out.println("In open");
    }
}


But when I stop a job and start it again no saving data I see. I check it
with printing data to sysout. There is null value after restarting job.

But why do I get this behavior? Maybe my settings is not proper?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to