Re: Reset of transient variables in state to default values.

2021-10-20 Thread Yun Tang
Hi, For RocksDB state backend, it will pick the registered kryo serializer for normal read/write use and checkpint/restore. Moreover, since key-values are serialized to store in RocksDB, it actually deep copy them to avoid later unexpected modification. For FileSystem/HashMap state backend,

Re: Reset of transient variables in state to default values.

2021-10-18 Thread Arvid Heise
That's what I would try out, but I'm not sure if the statebackend would pick that up. @Yun Tang do you know more? On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky wrote: > Hi Arvid, > > It sounds like a good direction, do I need to register my state class with > KryoSerializer , similar to this

Re: Reset of transient variables in state to default values.

2021-10-18 Thread Alex Drobinsky
Hi Arvid, It sounds like a good direction, do I need to register my state class with KryoSerializer , similar to this ? env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class, ProtobufSerializer.class); пн, 18 окт. 2021 г. в 10:32, Arvid Heise : > Hi Alex,

Re: Reset of transient variables in state to default values.

2021-10-18 Thread Arvid Heise
Hi Alex, could you also log the identifity hashcode (or something similar) of the related instance? I suspect that it's not the field that is set to null but that you get a clone where the field is null. In that case, you need to add a specific KryoSerializer to initialize it (or just go with a

Re: Reset of transient variables in state to default values.

2021-10-12 Thread Alex Drobinsky
Hi Jing, Job doesn't restart from the checkpoint, it's a brand new clean job , no exceptions happened during execution, no restarts :) The state is a Keyed State so a new key means a new State - in this situation a currentFile is equal to null - as expected and handled without issues. Before I

Re: Reset of transient variables in state to default values.

2021-10-12 Thread JING ZHANG
Hi Alex, Since you use `FileSystemStateBackend`, I think currentFile became nullified once in a while is not caused by period checkpoint. Because if job is running without failover or restore from checkpoint, read/write value state on `FileSystemStateBackend` does not cause serialization and

Re: Reset of transient variables in state to default values.

2021-10-12 Thread Yun Tang
Hi Alex, Since you use customized MultiStorePacketState class as the value state type, it should use kryo serializer [1] to serialize your class via accessing RocksDB state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo would serialize your transient field. If you're

Re: Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
It would be difficult to provide even a semblance of the complete product , however I could try to provide enough details to reproduce the problem. Standard source would do: DataStream stream = env.addSource( new FlinkKafkaConsumer<>(topic, new AbstractDeserializationSchema() {

Re: Reset of transient variables in state to default values.

2021-10-11 Thread JING ZHANG
Hi Alex, It is a little weird. Would you please provide the program which could reproduce the problem, including DataStream job code and related classes code. I need some debug to find out the reason. Best, JING ZHANG Alex Drobinsky 于2021年10月11日周一 下午5:50写道: > Hi Jing Zhang, > > I'm using the

Re: Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
Hi Jing Zhang, I'm using the FileSystem backend. I also implemented ReadObject function to support proper restart procedure: private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized

Re: Reset of transient variables in state to default values.

2021-10-11 Thread JING ZHANG
Hi, Alex What state backend do you choose? If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may not have effect because MemoryStateBackend does not serialize state for regular read/write accesses but keeps it as objects on the heap. If you choose RocksDBStateBackend, I

Reset of transient variables in state to default values.

2021-10-11 Thread Alex Drobinsky
Dear flink community, I have following state class ( irrelevant fields removed ) public class MultiStorePacketState implements Serializable { public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName;