I think Avro has two modes of operation. Either you supply a schema
upfront or you let it auto-extract from a given type.

I don't know how the auto-extraction works but if you specify the schema
and version in your Beam Avro coder, then Avro will automatically
resolve the checkpoint data against the current version of the schema.

> and then that generated schema is saved within the
> Flink checkpoint, right?

Yes, the coder and its state is Java serialized. It is problematic if
the coder itself references to classes for which the Java serialization
breaks because they cannot be restored then. Beam should really have a
dedicated interface for serializing coders instead of relying on Java
serialization.

-Max

On 04.06.20 15:05, Ivan San Jose wrote:
> Thanks Max for your response. I'd try with AvroCoder then. But I still
> have a question, I guess AvroCoder is generating the AVRO schema using
> Java reflection, and then that generated schema is saved within the
> Flink checkpoint, right?
> 
> On Wed, 2020-06-03 at 18:00 +0200, Maximilian Michels wrote:
>> Hi Ivan,
>>
>> Moving to the new type serializer snapshot interface is not going to
>> solve this problem because we cannot version the coder through the
>> Beam
>> coder interface. That is only possible through Flink. However, it is
>> usually not trivial.
>>
>> In Beam, when you evolve your data model, the only way you can
>> maintain
>> compatible is to use a serialization format which can evolve, e.g.
>> KafkaIO or Protobuf.
>>
>> Cheers,
>> Max
>>
>> On 03.06.20 16:47, Ivan San Jose wrote:
>>> Hi, we have a Beam application running with Flink runner and we are
>>> struggling using Flink checkpoints. Everytime we evolve the source
>>> code
>>> modifying a Java model, an exception is thrown when trying to
>>> restore
>>> last checkpoint taken:
>>>
>>> Caused by: java.lang.IllegalStateException: Could not Java-
>>> deserialize
>>> TypeSerializer while restoring checkpoint metadata for serializer
>>> snapshot
>>> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerialize
>>> r$Le
>>> gacySnapshot'. Please update to the TypeSerializerSnapshot
>>> interface
>>> that removes Java Serialization to avoid this problem in the
>>> future.
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.
>>> rest
>>> oreSerializer(TypeSerializerConfigSnapshot.java:138)
>>> at
>>> org.apache.flink.runtime.state.StateSerializerProvider.previousSche
>>> maSe
>>> rializer(StateSerializerProvider.java:189)
>>> at
>>> org.apache.flink.runtime.state.StateSerializerProvider.currentSchem
>>> aSer
>>> ializer(StateSerializerProvider.java:164)
>>> at
>>> org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaIn
>>> fo.g
>>> etPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.j
>>> ava:
>>> 113)
>>> at
>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restor
>>> e(Op
>>> eratorStateRestoreOperation.java:94)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.b
>>> uild
>>> (DefaultOperatorStateBackendBuilder.java:83)
>>> ... 12 more
>>> Caused by: java.io.InvalidClassException:
>>> internal.model.dimension.Dimension; local class incompatible:
>>> stream
>>> classdesc serialVersionUID = -223148029368332375, local class
>>> serialVersionUID = 4489864664852536553
>>>
>>> As you can see the exception is complaining about class was evolved
>>> and
>>> they are not compatible any more.
>>>
>>> After checking some documentation and Beam source code...
>>> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>>>  (Serializers
>>> vs
>>> Coders)
>>> https://github.com/apache/beam/blob/785609f22d013411b7973bbf9e2d15c3c8171fb2/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
>>>
>>> It seems that Beam coders are wrapped into Flink's TypeSerializers,
>>> and, at the end, Beam coders are the ones in charge of
>>> serialize/deserialize objects.
>>>
>>> Also reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17
>>> , and seeing CoderTyperSerializer implementation it seems Beam is
>>> implementing a deprecated interface and should be moved to new
>>> TypeSerializerSnapshot one. To be honest I don't know if that would
>>> solve my problem, but the exception is clearly saying "Please
>>> update to
>>> the TypeSerializerSnapshot interface that removes Java
>>> Serialization to
>>> avoid this problem in the future.", so I gues would solve it.
>>>
>>> Can someone help me here? I could try to update the implementation
>>> if
>>> someone give me some hints, because right now I'm a little bit
>>> lost.
>>>
>>> Thanks
>>>
>>>
>>> Este correo electrónico y sus adjuntos son de naturaleza
>>> confidencial. A no ser que usted sea el destinatario, no puede
>>> utilizar, copiar o desvelar tanto el mensaje como cualquier
>>> información contenida en el mensaje. Si no es el destinatario, debe
>>> borrar este correo y notificar al remitente inmediatamente.
>>> Cualquier punto de vista u opinión expresada en este correo
>>> electrónico son únicamente del remitente, a no ser que se indique
>>> lo contrario. Todos los derechos de autor en cualquier material de
>>> este correo son reservados. Todos los correos electrónicos,
>>> salientes o entrantes, pueden ser grabados y monitorizados para uso
>>> legítimo del negocio. Nos encontramos exentos de toda
>>> responsabilidad ante cualquier perdida o daño que surja o resulte
>>> de la recepción, uso o transmisión de este correo electrónico hasta
>>> el máximo permitido por la ley.
>>>
>>> This email and any attachment to it are confidential. Unless you
>>> are the intended recipient, you may not use, copy or disclose
>>> either the message or any information contained in the message. If
>>> you are not the intended recipient, you should delete this email
>>> and notify the sender immediately. Any views or opinions expressed
>>> in this email are those of the sender only, unless otherwise
>>> stated. All copyright in any of the material in this email is
>>> reserved. All emails, incoming and outgoing, may be recorded and
>>> monitored for legitimate business purposes. We exclude all
>>> liability for any loss or damage arising or resulting from the
>>> receipt, use or transmission of this email to the fullest extent
>>> permitted by law.
>>>
>>
>> --
>> This message has been scanned and is believed to be clean.
>>
> 
> 
> Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no 
> ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto 
> el mensaje como cualquier información contenida en el mensaje. Si no es el 
> destinatario, debe borrar este correo y notificar al remitente 
> inmediatamente. Cualquier punto de vista u opinión expresada en este correo 
> electrónico son únicamente del remitente, a no ser que se indique lo 
> contrario. Todos los derechos de autor en cualquier material de este correo 
> son reservados. Todos los correos electrónicos, salientes o entrantes, pueden 
> ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos 
> exentos de toda responsabilidad ante cualquier perdida o daño que surja o 
> resulte de la recepción, uso o transmisión de este correo electrónico hasta 
> el máximo permitido por la ley.
> 
> This email and any attachment to it are confidential. Unless you are the 
> intended recipient, you may not use, copy or disclose either the message or 
> any information contained in the message. If you are not the intended 
> recipient, you should delete this email and notify the sender immediately. 
> Any views or opinions expressed in this email are those of the sender only, 
> unless otherwise stated. All copyright in any of the material in this email 
> is reserved. All emails, incoming and outgoing, may be recorded and monitored 
> for legitimate business purposes. We exclude all liability for any loss or 
> damage arising or resulting from the receipt, use or transmission of this 
> email to the fullest extent permitted by law.
> 

Reply via email to