Ok, this is clear. For now, I went a different path though - having my business logic-related POJOs still Avro-generated, but I introduced another, generic one, which just stores schema & payload bytes, and does not need to change. Having that, I use https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/DelegateCoder.html
So far, this seems to allow me to add fields to POJOs and to remove them, without state breakage. Thank you for clarifications. I will re-open this thread if I get into issues. Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Fri, 9 Jul 2021 at 17:12, Luke Cwik <[email protected]> wrote: > Yes. > > On Fri, Jul 9, 2021 at 1:07 AM Pavel Solomin <[email protected]> > wrote: > >> Hello Luke, thanks for your reply. >> >> I do not set serialVersionUID myself, but when I change Avro schema of my >> PoJo, the generated class has that VersionUID changed. >> >> So, as I understand your suggestion, it is basically this: >> 1 - don't use Avro-generated PoJo, cause there is no control of >> serialVersionUID there, and write PoJo myself >> 2 - set serialVersionUID and never change it, if I want to keep savepoint >> restore-able >> 3 - with (1) and (2) conditions, I can add fields to PoJo without state >> breakage >> 4 - This mechanism relies on Java built-in serialization, so, rules like >> these apply - http://www.jguru.com/faq/view.jsp?EID=5064 >> >> Is that correct? >> >> Best Regards, >> Pavel Solomin >> >> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin | >> Linkedin <https://www.linkedin.com/in/pavelsolomin> >> >> >> >> >> >> On Wed, 7 Jul 2021 at 19:30, Luke Cwik <[email protected]> wrote: >> >>> Have you set the serialVersionUID in MyPojo? >>> >>> The serialVersionUID is used by Java to detect when objects change. See >>> https://www.baeldung.com/java-serial-version-uid for more details. >>> >>> On Tue, Jul 6, 2021 at 6:33 PM Pavel Solomin <[email protected]> >>> wrote: >>> >>>> Hello! >>>> >>>> I am using Beam 2.30.0 with Flink runner 1.11. >>>> >>>> The app gets input data as some POJOs, creates fixed windows of such, >>>> and then writes window files to AWS S3. >>>> >>>> As the data evolves, I want to add new fields to my POJOs (which are >>>> actually avro-generated ones), and, given the new POJO is fully compatible >>>> with the old one, I would expect the state to be successfully restored once >>>> I deploy changed application code. >>>> >>>> I am looking for code examples where such case is solved, and couldn't >>>> find any. Couldn't find any section discussing evolution of stateful >>>> processors in the doc either. >>>> >>>> I tried multiple methods so far: >>>> - not specifying any coders (as my avro-generated POJOs are already >>>> Serializable) >>>> - using KryoCoder (mentioned in >>>> https://beam.apache.org/documentation/sdks/java/euphoria/) >>>> - using AvroCoder ( >>>> https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html >>>> ) >>>> - implementing my own (see below) >>>> >>>> public class MyPoJoCoder extends CustomCoder<MyPoJo> { >>>> @Override >>>> public void encode(MyPoJo value, OutputStream outStream) throws >>>> CoderException, IOException { >>>> ObjectOutputStream oos = new ObjectOutputStream(outStream); >>>> value.writeExternal(oos); >>>> } >>>> >>>> @Override >>>> public MyPoJo decode(InputStream inStream) throws CoderException, >>>> IOException { >>>> MyPoJo value = new MyPoJo(); >>>> ObjectInputStream ois = new ObjectInputStream(inStream); >>>> value.readExternal(ois); >>>> return value; >>>> } >>>> } >>>> >>>> but still getting exception: >>>> >>>> java.lang.Exception: Exception while creating >>>> StreamOperatorStateContext. >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:204) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>> .initializeState(AbstractStreamOperator.java:247) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>>> .initializeStateAndOpenOperators(OperatorChain.java:290) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .lambda$beforeInvoke$0(StreamTask.java:474) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47 >>>> ) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .beforeInvoke(StreamTask.java:470) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:529) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549) >>>> at java.base/java.lang.Thread.run(Thread.java:829) >>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>> operator state backend for >>>> DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_(1/1) from any of the 1 >>>> provided restore options. >>>> at org.apache.flink.streaming.api.operators. >>>> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure >>>> .java:135) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.operatorStateBackend( >>>> StreamTaskStateInitializerImpl.java:265) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:152) >>>> ... 9 more >>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>> Failed when trying to restore operator state backend >>>> at org.apache.flink.runtime.state. >>>> DefaultOperatorStateBackendBuilder.build( >>>> DefaultOperatorStateBackendBuilder.java:86) >>>> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend >>>> .createOperatorStateBackend(RocksDBStateBackend.java:552) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0( >>>> StreamTaskStateInitializerImpl.java:256) >>>> at org.apache.flink.streaming.api.operators. >>>> BackendRestorerProcedure.attemptCreateAndRestore( >>>> BackendRestorerProcedure.java:142) >>>> at org.apache.flink.streaming.api.operators. >>>> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure >>>> .java:121) >>>> ... 11 more >>>> Caused by: java.lang.IllegalStateException: Could not Java-deserialize >>>> TypeSerializer while restoring checkpoint metadata for serializer >>>> snapshot >>>> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'. >>>> Please update to the TypeSerializerSnapshot interface that removes Java >>>> Serialization to avoid this problem in the future. >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerConfigSnapshot.restoreSerializer( >>>> TypeSerializerConfigSnapshot.java:138) >>>> at org.apache.flink.runtime.state.StateSerializerProvider >>>> .previousSchemaSerializer(StateSerializerProvider.java:189) >>>> at org.apache.flink.runtime.state.StateSerializerProvider >>>> .currentSchemaSerializer(StateSerializerProvider.java:164) >>>> at org.apache.flink.runtime.state. >>>> RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer( >>>> RegisteredOperatorStateBackendMetaInfo.java:113) >>>> at org.apache.flink.runtime.state.OperatorStateRestoreOperation >>>> .restore(OperatorStateRestoreOperation.java:94) >>>> at org.apache.flink.runtime.state. >>>> DefaultOperatorStateBackendBuilder.build( >>>> DefaultOperatorStateBackendBuilder.java:83) >>>> ... 15 more >>>> Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local >>>> class incompatible: stream classdesc serialVersionUID = - >>>> 4766138050980652522, local class serialVersionUID = - >>>> 3493429883367292394 >>>> at java.base/java.io.ObjectStreamClass.initNonProxy( >>>> ObjectStreamClass.java:689) >>>> at java.base/java.io.ObjectInputStream.readNonProxyDesc( >>>> ObjectInputStream.java:2012) >>>> at java.base/java.io.ObjectInputStream.readClassDesc( >>>> ObjectInputStream.java:1862) >>>> at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream >>>> .java:1825) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1650) >>>> at java.base/java.io.ObjectInputStream.defaultReadFields( >>>> ObjectInputStream.java:2464) >>>> at java.base/java.io.ObjectInputStream.readSerialData( >>>> ObjectInputStream.java:2358) >>>> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >>>> ObjectInputStream.java:2196) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1679) >>>> at java.base/java.io.ObjectInputStream.defaultReadFields( >>>> ObjectInputStream.java:2464) >>>> at java.base/java.io.ObjectInputStream.readSerialData( >>>> ObjectInputStream.java:2358) >>>> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >>>> ObjectInputStream.java:2196) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1679) >>>> at java.base/java.io.ObjectInputStream.defaultReadFields( >>>> ObjectInputStream.java:2464) >>>> at java.base/java.io.ObjectInputStream.readSerialData( >>>> ObjectInputStream.java:2358) >>>> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >>>> ObjectInputStream.java:2196) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1679) >>>> at java.base/java.io.ObjectInputStream.defaultReadFields( >>>> ObjectInputStream.java:2464) >>>> at java.base/java.io.ObjectInputStream.readSerialData( >>>> ObjectInputStream.java:2358) >>>> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >>>> ObjectInputStream.java:2196) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1679) >>>> at java.base/java.io.ObjectInputStream.defaultReadFields( >>>> ObjectInputStream.java:2464) >>>> at java.base/java.io.ObjectInputStream.readSerialData( >>>> ObjectInputStream.java:2358) >>>> at java.base/java.io.ObjectInputStream.readOrdinaryObject( >>>> ObjectInputStream.java:2196) >>>> at java.base/java.io.ObjectInputStream.readObject0( >>>> ObjectInputStream.java:1679) >>>> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream >>>> .java:493) >>>> at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream >>>> .java:451) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read( >>>> TypeSerializerSerializationUtil.java:301) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerSerializationUtil.tryReadSerializer( >>>> TypeSerializerSerializationUtil.java:116) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot >>>> .java:113) >>>> at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot >>>> .readVersionedSnapshot(TypeSerializerSnapshot.java:174) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy >>>> .deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy >>>> .read(TypeSerializerSnapshotSerializationUtil.java:150) >>>> at org.apache.flink.api.common.typeutils. >>>> TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( >>>> TypeSerializerSnapshotSerializationUtil.java:76) >>>> at org.apache.flink.runtime.state.metainfo. >>>> StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl >>>> .readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219 >>>> ) >>>> at org.apache.flink.runtime.state.OperatorBackendSerializationProxy >>>> .read(OperatorBackendSerializationProxy.java:119) >>>> at org.apache.flink.runtime.state.OperatorStateRestoreOperation >>>> .restore(OperatorStateRestoreOperation.java:83) >>>> ... 16 more >>>> >>>> Best Regards, >>>> Pavel Solomin >>>> >>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin >>>> | Linkedin <https://www.linkedin.com/in/pavelsolomin> >>>> >>>> >>>> >>>>
