Ok, this is clear.

For now, I went a different path though - having my business logic-related
POJOs still Avro-generated, but I introduced another, generic one, which
just stores schema & payload bytes, and does not need to change. Having
that, I use
https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/DelegateCoder.html

So far, this seems to allow me to add fields to POJOs and to remove them,
without state breakage.

Thank you for clarifications. I will re-open this thread if I get into
issues.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 9 Jul 2021 at 17:12, Luke Cwik <[email protected]> wrote:

> Yes.
>
> On Fri, Jul 9, 2021 at 1:07 AM Pavel Solomin <[email protected]>
> wrote:
>
>> Hello Luke, thanks for your reply.
>>
>> I do not set serialVersionUID myself, but when I change Avro schema of my
>> PoJo, the generated class has that VersionUID changed.
>>
>> So, as I understand your suggestion, it is basically this:
>> 1 - don't use Avro-generated PoJo, cause there is no control of
>> serialVersionUID there, and write PoJo myself
>> 2 - set serialVersionUID and never change it, if I want to keep savepoint
>> restore-able
>> 3 - with (1) and (2) conditions, I can add fields to PoJo without state
>> breakage
>> 4 - This mechanism relies on Java built-in serialization, so, rules like
>> these apply - http://www.jguru.com/faq/view.jsp?EID=5064
>>
>> Is that correct?
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Wed, 7 Jul 2021 at 19:30, Luke Cwik <[email protected]> wrote:
>>
>>> Have you set the serialVersionUID in MyPojo?
>>>
>>> The serialVersionUID is used by Java to detect when objects change. See
>>> https://www.baeldung.com/java-serial-version-uid for more details.
>>>
>>> On Tue, Jul 6, 2021 at 6:33 PM Pavel Solomin <[email protected]>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I am using Beam 2.30.0 with Flink runner 1.11.
>>>>
>>>> The app gets input data as some POJOs, creates fixed windows of such,
>>>> and then writes window files to AWS S3.
>>>>
>>>> As the data evolves, I want to add new fields to my POJOs (which are
>>>> actually avro-generated ones), and, given the new POJO is fully compatible
>>>> with the old one, I would expect the state to be successfully restored once
>>>> I deploy changed application code.
>>>>
>>>> I am looking for code examples where such case is solved, and couldn't
>>>> find any. Couldn't find any section discussing evolution of stateful
>>>> processors in the doc either.
>>>>
>>>> I tried multiple methods so far:
>>>> - not specifying any coders (as my avro-generated POJOs are already
>>>> Serializable)
>>>> - using KryoCoder (mentioned in
>>>> https://beam.apache.org/documentation/sdks/java/euphoria/)
>>>> - using AvroCoder (
>>>> https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html
>>>> )
>>>> - implementing my own (see below)
>>>>
>>>> public class MyPoJoCoder extends CustomCoder<MyPoJo> {
>>>>     @Override
>>>>     public void encode(MyPoJo value, OutputStream outStream) throws 
>>>> CoderException, IOException {
>>>>         ObjectOutputStream oos = new ObjectOutputStream(outStream);
>>>>         value.writeExternal(oos);
>>>>     }
>>>>
>>>>     @Override
>>>>     public MyPoJo decode(InputStream inStream) throws CoderException, 
>>>> IOException {
>>>>         MyPoJo value = new MyPoJo();
>>>>         ObjectInputStream ois = new ObjectInputStream(inStream);
>>>>         value.readExternal(ois);
>>>>         return value;
>>>>     }
>>>> }
>>>>
>>>> but still getting exception:
>>>>
>>>> java.lang.Exception: Exception while creating
>>>> StreamOperatorStateContext.
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>> StreamTaskStateInitializerImpl.java:204)
>>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>> .initializeState(AbstractStreamOperator.java:247)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .initializeStateAndOpenOperators(OperatorChain.java:290)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .lambda$beforeInvoke$0(StreamTask.java:474)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47
>>>> )
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .beforeInvoke(StreamTask.java:470)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:529)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
>>>>     at java.base/java.lang.Thread.run(Thread.java:829)
>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> operator state backend for
>>>> DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_(1/1) from any of the 1
>>>> provided restore options.
>>>>     at org.apache.flink.streaming.api.operators.
>>>> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure
>>>> .java:135)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.operatorStateBackend(
>>>> StreamTaskStateInitializerImpl.java:265)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>> StreamTaskStateInitializerImpl.java:152)
>>>>     ... 9 more
>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Failed when trying to restore operator state backend
>>>>     at org.apache.flink.runtime.state.
>>>> DefaultOperatorStateBackendBuilder.build(
>>>> DefaultOperatorStateBackendBuilder.java:86)
>>>>     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
>>>> .createOperatorStateBackend(RocksDBStateBackend.java:552)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(
>>>> StreamTaskStateInitializerImpl.java:256)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> BackendRestorerProcedure.attemptCreateAndRestore(
>>>> BackendRestorerProcedure.java:142)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure
>>>> .java:121)
>>>>     ... 11 more
>>>> Caused by: java.lang.IllegalStateException: Could not Java-deserialize
>>>> TypeSerializer while restoring checkpoint metadata for serializer
>>>> snapshot
>>>> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'.
>>>> Please update to the TypeSerializerSnapshot interface that removes Java
>>>> Serialization to avoid this problem in the future.
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerConfigSnapshot.restoreSerializer(
>>>> TypeSerializerConfigSnapshot.java:138)
>>>>     at org.apache.flink.runtime.state.StateSerializerProvider
>>>> .previousSchemaSerializer(StateSerializerProvider.java:189)
>>>>     at org.apache.flink.runtime.state.StateSerializerProvider
>>>> .currentSchemaSerializer(StateSerializerProvider.java:164)
>>>>     at org.apache.flink.runtime.state.
>>>> RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(
>>>> RegisteredOperatorStateBackendMetaInfo.java:113)
>>>>     at org.apache.flink.runtime.state.OperatorStateRestoreOperation
>>>> .restore(OperatorStateRestoreOperation.java:94)
>>>>     at org.apache.flink.runtime.state.
>>>> DefaultOperatorStateBackendBuilder.build(
>>>> DefaultOperatorStateBackendBuilder.java:83)
>>>>     ... 15 more
>>>> Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local
>>>> class incompatible: stream classdesc serialVersionUID = -
>>>> 4766138050980652522, local class serialVersionUID = -
>>>> 3493429883367292394
>>>>     at java.base/java.io.ObjectStreamClass.initNonProxy(
>>>> ObjectStreamClass.java:689)
>>>>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(
>>>> ObjectInputStream.java:2012)
>>>>     at java.base/java.io.ObjectInputStream.readClassDesc(
>>>> ObjectInputStream.java:1862)
>>>>     at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream
>>>> .java:1825)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1650)
>>>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>>>> ObjectInputStream.java:2464)
>>>>     at java.base/java.io.ObjectInputStream.readSerialData(
>>>> ObjectInputStream.java:2358)
>>>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>>>> ObjectInputStream.java:2196)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1679)
>>>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>>>> ObjectInputStream.java:2464)
>>>>     at java.base/java.io.ObjectInputStream.readSerialData(
>>>> ObjectInputStream.java:2358)
>>>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>>>> ObjectInputStream.java:2196)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1679)
>>>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>>>> ObjectInputStream.java:2464)
>>>>     at java.base/java.io.ObjectInputStream.readSerialData(
>>>> ObjectInputStream.java:2358)
>>>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>>>> ObjectInputStream.java:2196)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1679)
>>>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>>>> ObjectInputStream.java:2464)
>>>>     at java.base/java.io.ObjectInputStream.readSerialData(
>>>> ObjectInputStream.java:2358)
>>>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>>>> ObjectInputStream.java:2196)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1679)
>>>>     at java.base/java.io.ObjectInputStream.defaultReadFields(
>>>> ObjectInputStream.java:2464)
>>>>     at java.base/java.io.ObjectInputStream.readSerialData(
>>>> ObjectInputStream.java:2358)
>>>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(
>>>> ObjectInputStream.java:2196)
>>>>     at java.base/java.io.ObjectInputStream.readObject0(
>>>> ObjectInputStream.java:1679)
>>>>     at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
>>>> .java:493)
>>>>     at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
>>>> .java:451)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(
>>>> TypeSerializerSerializationUtil.java:301)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerSerializationUtil.tryReadSerializer(
>>>> TypeSerializerSerializationUtil.java:116)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot
>>>> .java:113)
>>>>     at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot
>>>> .readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
>>>> .deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
>>>> .read(TypeSerializerSnapshotSerializationUtil.java:150)
>>>>     at org.apache.flink.api.common.typeutils.
>>>> TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
>>>> TypeSerializerSnapshotSerializationUtil.java:76)
>>>>     at org.apache.flink.runtime.state.metainfo.
>>>> StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl
>>>> .readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219
>>>> )
>>>>     at org.apache.flink.runtime.state.OperatorBackendSerializationProxy
>>>> .read(OperatorBackendSerializationProxy.java:119)
>>>>     at org.apache.flink.runtime.state.OperatorStateRestoreOperation
>>>> .restore(OperatorStateRestoreOperation.java:83)
>>>>     ... 16 more
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin
>>>> | Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>

Reply via email to