Hello Luke, thanks for your reply. I do not set serialVersionUID myself, but when I change Avro schema of my PoJo, the generated class has that VersionUID changed.
So, as I understand your suggestion, it is basically this: 1 - don't use Avro-generated PoJo, cause there is no control of serialVersionUID there, and write PoJo myself 2 - set serialVersionUID and never change it, if I want to keep savepoint restore-able 3 - with (1) and (2) conditions, I can add fields to PoJo without state breakage 4 - This mechanism relies on Java built-in serialization, so, rules like these apply - http://www.jguru.com/faq/view.jsp?EID=5064 Is that correct? Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Wed, 7 Jul 2021 at 19:30, Luke Cwik <[email protected]> wrote: > Have you set the serialVersionUID in MyPojo? > > The serialVersionUID is used by Java to detect when objects change. See > https://www.baeldung.com/java-serial-version-uid for more details. > > On Tue, Jul 6, 2021 at 6:33 PM Pavel Solomin <[email protected]> > wrote: > >> Hello! >> >> I am using Beam 2.30.0 with Flink runner 1.11. >> >> The app gets input data as some POJOs, creates fixed windows of such, and >> then writes window files to AWS S3. >> >> As the data evolves, I want to add new fields to my POJOs (which are >> actually avro-generated ones), and, given the new POJO is fully compatible >> with the old one, I would expect the state to be successfully restored once >> I deploy changed application code. >> >> I am looking for code examples where such case is solved, and couldn't >> find any. Couldn't find any section discussing evolution of stateful >> processors in the doc either. >> >> I tried multiple methods so far: >> - not specifying any coders (as my avro-generated POJOs are already >> Serializable) >> - using KryoCoder (mentioned in >> https://beam.apache.org/documentation/sdks/java/euphoria/) >> - using AvroCoder ( >> https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html >> ) >> - implementing my own (see below) >> >> public class MyPoJoCoder extends CustomCoder<MyPoJo> { >> @Override >> public void encode(MyPoJo value, OutputStream outStream) throws >> CoderException, IOException { >> ObjectOutputStream oos = new ObjectOutputStream(outStream); >> value.writeExternal(oos); >> } >> >> @Override >> public MyPoJo decode(InputStream inStream) throws CoderException, >> IOException { >> MyPoJo value = new MyPoJo(); >> ObjectInputStream ois = new ObjectInputStream(inStream); >> value.readExternal(ois); >> return value; >> } >> } >> >> but still getting exception: >> >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:204) >> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >> .initializeState(AbstractStreamOperator.java:247) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain >> .initializeStateAndOpenOperators(OperatorChain.java:290) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .lambda$beforeInvoke$0(StreamTask.java:474) >> at org.apache.flink.streaming.runtime.tasks. >> StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( >> StreamTask.java:470) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:529) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549) >> at java.base/java.lang.Thread.run(Thread.java:829) >> Caused by: org.apache.flink.util.FlinkException: Could not restore >> operator state backend for DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_ >> (1/1) from any of the 1 provided restore options. >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:135) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.operatorStateBackend( >> StreamTaskStateInitializerImpl.java:265) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:152) >> ... 9 more >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >> Failed when trying to restore operator state backend >> at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder >> .build(DefaultOperatorStateBackendBuilder.java:86) >> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend >> .createOperatorStateBackend(RocksDBStateBackend.java:552) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0( >> StreamTaskStateInitializerImpl.java:256) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .attemptCreateAndRestore(BackendRestorerProcedure.java:142) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:121) >> ... 11 more >> Caused by: java.lang.IllegalStateException: Could not Java-deserialize >> TypeSerializer while restoring checkpoint metadata for serializer >> snapshot >> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'. >> Please update to the TypeSerializerSnapshot interface that removes Java >> Serialization to avoid this problem in the future. >> at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot >> .restoreSerializer(TypeSerializerConfigSnapshot.java:138) >> at org.apache.flink.runtime.state.StateSerializerProvider >> .previousSchemaSerializer(StateSerializerProvider.java:189) >> at org.apache.flink.runtime.state.StateSerializerProvider >> .currentSchemaSerializer(StateSerializerProvider.java:164) >> at org.apache.flink.runtime.state. >> RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer( >> RegisteredOperatorStateBackendMetaInfo.java:113) >> at org.apache.flink.runtime.state.OperatorStateRestoreOperation >> .restore(OperatorStateRestoreOperation.java:94) >> at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder >> .build(DefaultOperatorStateBackendBuilder.java:83) >> ... 15 more >> Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local >> class incompatible: stream classdesc serialVersionUID = - >> 4766138050980652522, local class serialVersionUID = -3493429883367292394 >> at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass >> .java:689) >> at java.base/java.io.ObjectInputStream.readNonProxyDesc( >> ObjectInputStream.java:2012) >> at java.base/java.io.ObjectInputStream.readClassDesc( >> ObjectInputStream.java:1862) >> at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream >> .java:1825) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1650) >> at java.base/java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2464) >> at java.base/java.io.ObjectInputStream.readSerialData( >> ObjectInputStream.java:2358) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2196) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1679) >> at java.base/java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2464) >> at java.base/java.io.ObjectInputStream.readSerialData( >> ObjectInputStream.java:2358) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2196) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1679) >> at java.base/java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2464) >> at java.base/java.io.ObjectInputStream.readSerialData( >> ObjectInputStream.java:2358) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2196) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1679) >> at java.base/java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2464) >> at java.base/java.io.ObjectInputStream.readSerialData( >> ObjectInputStream.java:2358) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2196) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1679) >> at java.base/java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2464) >> at java.base/java.io.ObjectInputStream.readSerialData( >> ObjectInputStream.java:2358) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2196) >> at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream >> .java:1679) >> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream >> .java:493) >> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream >> .java:451) >> at org.apache.flink.api.common.typeutils. >> TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read( >> TypeSerializerSerializationUtil.java:301) >> at org.apache.flink.api.common.typeutils. >> TypeSerializerSerializationUtil.tryReadSerializer( >> TypeSerializerSerializationUtil.java:116) >> at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot >> .readSnapshot(TypeSerializerConfigSnapshot.java:113) >> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot >> .readVersionedSnapshot(TypeSerializerSnapshot.java:174) >> at org.apache.flink.api.common.typeutils. >> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy >> .deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) >> at org.apache.flink.api.common.typeutils. >> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy >> .read(TypeSerializerSnapshotSerializationUtil.java:150) >> at org.apache.flink.api.common.typeutils. >> TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( >> TypeSerializerSnapshotSerializationUtil.java:76) >> at org.apache.flink.runtime.state.metainfo. >> StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl >> .readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) >> at org.apache.flink.runtime.state.OperatorBackendSerializationProxy >> .read(OperatorBackendSerializationProxy.java:119) >> at org.apache.flink.runtime.state.OperatorStateRestoreOperation >> .restore(OperatorStateRestoreOperation.java:83) >> ... 16 more >> >> Best Regards, >> Pavel Solomin >> >> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin | >> Linkedin <https://www.linkedin.com/in/pavelsolomin> >> >> >> >>
