Hello Luke, thanks for your reply.

I do not set serialVersionUID myself, but when I change Avro schema of my
PoJo, the generated class has that VersionUID changed.

So, as I understand your suggestion, it is basically this:
1 - don't use Avro-generated PoJo, cause there is no control of
serialVersionUID there, and write PoJo myself
2 - set serialVersionUID and never change it, if I want to keep savepoint
restore-able
3 - with (1) and (2) conditions, I can add fields to PoJo without state
breakage
4 - This mechanism relies on Java built-in serialization, so, rules like
these apply - http://www.jguru.com/faq/view.jsp?EID=5064

Is that correct?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 7 Jul 2021 at 19:30, Luke Cwik <[email protected]> wrote:

> Have you set the serialVersionUID in MyPojo?
>
> The serialVersionUID is used by Java to detect when objects change. See
> https://www.baeldung.com/java-serial-version-uid for more details.
>
> On Tue, Jul 6, 2021 at 6:33 PM Pavel Solomin <[email protected]>
> wrote:
>
>> Hello!
>>
>> I am using Beam 2.30.0 with Flink runner 1.11.
>>
>> The app gets input data as some POJOs, creates fixed windows of such, and
>> then writes window files to AWS S3.
>>
>> As the data evolves, I want to add new fields to my POJOs (which are
>> actually avro-generated ones), and, given the new POJO is fully compatible
>> with the old one, I would expect the state to be successfully restored once
>> I deploy changed application code.
>>
>> I am looking for code examples where such case is solved, and couldn't
>> find any. Couldn't find any section discussing evolution of stateful
>> processors in the doc either.
>>
>> I tried multiple methods so far:
>> - not specifying any coders (as my avro-generated POJOs are already
>> Serializable)
>> - using KryoCoder (mentioned in
>> https://beam.apache.org/documentation/sdks/java/euphoria/)
>> - using AvroCoder (
>> https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html
>> )
>> - implementing my own (see below)
>>
>> public class MyPoJoCoder extends CustomCoder<MyPoJo> {
>>     @Override
>>     public void encode(MyPoJo value, OutputStream outStream) throws 
>> CoderException, IOException {
>>         ObjectOutputStream oos = new ObjectOutputStream(outStream);
>>         value.writeExternal(oos);
>>     }
>>
>>     @Override
>>     public MyPoJo decode(InputStream inStream) throws CoderException, 
>> IOException {
>>         MyPoJo value = new MyPoJo();
>>         ObjectInputStream ois = new ObjectInputStream(inStream);
>>         value.readExternal(ois);
>>         return value;
>>     }
>> }
>>
>> but still getting exception:
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>     at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:204)
>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> .initializeState(AbstractStreamOperator.java:247)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .initializeStateAndOpenOperators(OperatorChain.java:290)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .lambda$beforeInvoke$0(StreamTask.java:474)
>>     at org.apache.flink.streaming.runtime.tasks.
>> StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> StreamTask.java:470)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:529)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
>>     at java.base/java.lang.Thread.run(Thread.java:829)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_
>> (1/1) from any of the 1 provided restore options.
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:135)
>>     at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.operatorStateBackend(
>> StreamTaskStateInitializerImpl.java:265)
>>     at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:152)
>>     ... 9 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Failed when trying to restore operator state backend
>>     at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder
>> .build(DefaultOperatorStateBackendBuilder.java:86)
>>     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
>> .createOperatorStateBackend(RocksDBStateBackend.java:552)
>>     at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(
>> StreamTaskStateInitializerImpl.java:256)
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:121)
>>     ... 11 more
>> Caused by: java.lang.IllegalStateException: Could not Java-deserialize
>> TypeSerializer while restoring checkpoint metadata for serializer
>> snapshot
>> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'.
>> Please update to the TypeSerializerSnapshot interface that removes Java
>> Serialization to avoid this problem in the future.
>>     at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot
>> .restoreSerializer(TypeSerializerConfigSnapshot.java:138)
>>     at org.apache.flink.runtime.state.StateSerializerProvider
>> .previousSchemaSerializer(StateSerializerProvider.java:189)
>>     at org.apache.flink.runtime.state.StateSerializerProvider
>> .currentSchemaSerializer(StateSerializerProvider.java:164)
>>     at org.apache.flink.runtime.state.
>> RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(
>> RegisteredOperatorStateBackendMetaInfo.java:113)
>>     at org.apache.flink.runtime.state.OperatorStateRestoreOperation
>> .restore(OperatorStateRestoreOperation.java:94)
>>     at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder
>> .build(DefaultOperatorStateBackendBuilder.java:83)
>>     ... 15 more
>> Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local
>> class incompatible: stream classdesc serialVersionUID = -
>> 4766138050980652522, local class serialVersionUID = -3493429883367292394
>>     at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass
>> .java:689)
>>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(
>> ObjectInputStream.java:2012)
>>     at java.base/java.io.ObjectInputStream.readClassDesc(
>> ObjectInputStream.java:1862)
>>     at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream
>> .java:1825)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1650)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>> ObjectInputStream.java:2464)
>>     at java.base/java.io.ObjectInputStream.readSerialData(
>> ObjectInputStream.java:2358)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:2196)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1679)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>> ObjectInputStream.java:2464)
>>     at java.base/java.io.ObjectInputStream.readSerialData(
>> ObjectInputStream.java:2358)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:2196)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1679)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>> ObjectInputStream.java:2464)
>>     at java.base/java.io.ObjectInputStream.readSerialData(
>> ObjectInputStream.java:2358)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:2196)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1679)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>> ObjectInputStream.java:2464)
>>     at java.base/java.io.ObjectInputStream.readSerialData(
>> ObjectInputStream.java:2358)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:2196)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1679)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>> ObjectInputStream.java:2464)
>>     at java.base/java.io.ObjectInputStream.readSerialData(
>> ObjectInputStream.java:2358)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:2196)
>>     at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
>> .java:1679)
>>     at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
>> .java:493)
>>     at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
>> .java:451)
>>     at org.apache.flink.api.common.typeutils.
>> TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(
>> TypeSerializerSerializationUtil.java:301)
>>     at org.apache.flink.api.common.typeutils.
>> TypeSerializerSerializationUtil.tryReadSerializer(
>> TypeSerializerSerializationUtil.java:116)
>>     at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot
>> .readSnapshot(TypeSerializerConfigSnapshot.java:113)
>>     at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot
>> .readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>     at org.apache.flink.api.common.typeutils.
>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
>> .deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>     at org.apache.flink.api.common.typeutils.
>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
>> .read(TypeSerializerSnapshotSerializationUtil.java:150)
>>     at org.apache.flink.api.common.typeutils.
>> TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
>> TypeSerializerSnapshotSerializationUtil.java:76)
>>     at org.apache.flink.runtime.state.metainfo.
>> StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl
>> .readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>     at org.apache.flink.runtime.state.OperatorBackendSerializationProxy
>> .read(OperatorBackendSerializationProxy.java:119)
>>     at org.apache.flink.runtime.state.OperatorStateRestoreOperation
>> .restore(OperatorStateRestoreOperation.java:83)
>>     ... 16 more
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>

Reply via email to