See my answers inline.

> Sorry but I'm afraid I'm not understanding well the scenario... What's
> the point on keeping a reference of the serialized class if
> AVRO/Protobuf are using schemas?

They keep a reference to the class because they produce that type when
they deserialize data.

> I mean, I've understood that AVRO and Protobuf follow schemas in order
> to serialize/deserialize an object in a binary way. So I'd expect to
> have following things saved into the checkpoint:
>   - AVRO/Protobuf schema of the serialized object stored in JSON or
> using Java serialization mechanism
>   - Object binary serialized according to the schema

By default we serialize the entire Coder because Flink expects it to be
present when the savepoint/checkpoint is restored. It would be more
convenient if we could just use the latest serializer instead but that
does not seem possible because the loading of the savepoint/checkpoint
is decoupled from loading new serializers.

> So, then, when the checkpoint is going to be restored, it checks if new
> generated schema, from object to be restored, is compatible with the
> old one, and, if it is compatible, then just read its schema+binary
> data saved into the checkpoint restoring the object.

Compatibility check is up to the coder, Beam itself has nothing to do
with this. Flink just loads the old coder and tries to use it to read t
he checkpoitn data. Flink also has an interface which allows to perform
a serializer compatibility check. It allows coder migration by first
reading the data with the old coder and the writing it with the new one.
We currently do not make use of this because Beam lacks an interface to
check compatibility. However, I imagine we could have a list of coders
for which we implement such a check. That's also how Flink does it in
their serializers.

> Also I don't understand when you said that a reference to the Beam
> Coder is saved into the checkpoint, because the error I'm getting is
> referencing the java model class ("Caused by:
> java.io.InvalidClassException: internal.model.dimension.POJOModel;
> local class incompatible: stream classdesc serialVersionUID =
> -223148029368332375, local class serialVersionUID =
> 4489864664852536553"), not the coder itself.

It's because the coder needs to know the type it produces, so it keeps a
class reference. Without this, the coder wouldn't be able to instantiate
the correct type. It

Both AvroCoder and ProtoCoder reference the class which makes the coder
unusable if changes occur to the class. You need to use a coder which
does not do that (making the class reference "transient" would work).


=> What is the best solution for your problem?
Without us changing anything in Beam, the best solution is to write
coder which allows Schema migration, e.g.
AvroCoder/ProtoCoder/SchemaCoder but does not keep any non-transient
references to the type class. It has to lazily initially the schema from
the current data class.


> Could we try using schemas?

@Reuven We could. I think it should also cause problems because
SchemaCoder keeps a TypeDescriptor reference which will break in case of
class changes.


-Max

On 05.06.20 12:17, Ivan San Jose wrote:
> By the way, I've just tried using AvroCoder, which is inferring the
> schema from the Java object to be deserialized and I got same error
> when restoring the checkpoint :(
> 
> On Fri, 2020-06-05 at 06:24 +0000, Ivan San Jose wrote:
>> Sorry but I'm afraid I'm not understanding well the scenario...
>> What's
>> the point on keeping a reference of the serialized class if
>> AVRO/Protobuf are using schemas?
>>
>> I mean, I've understood that AVRO and Protobuf follow schemas in
>> order
>> to serialize/deserialize an object in a binary way. So I'd expect to
>> have following things saved into the checkpoint:
>>   - AVRO/Protobuf schema of the serialized object stored in JSON or
>> using Java serialization mechanism
>>   - Object binary serialized according to the schema
>>
>> So, then, when the checkpoint is going to be restored, it checks if
>> new
>> generated schema, from object to be restored, is compatible with the
>> old one, and, if it is compatible, then just read its schema+binary
>> data saved into the checkpoint restoring the object.
>>
>> Is not like that?
>>
>> Also I don't understand when you said that a reference to the Beam
>> Coder is saved into the checkpoint, because the error I'm getting is
>> referencing the java model class ("Caused by:
>> java.io.InvalidClassException: internal.model.dimension.POJOModel;
>> local class incompatible: stream classdesc serialVersionUID =
>> -223148029368332375, local class serialVersionUID =
>> 4489864664852536553"), not the coder itself.
>>
>> Thanks
>>
>> On Thu, 2020-06-04 at 18:40 +0200, Maximilian Michels wrote:
>>> I was under the assumption that this should work but ProtoCoder
>>> keeps
>>> a
>>> reference of the class used to serialize. That causes the snapshot
>>> to
>>> break.
>>>
>>> We can fix this by:
>>>
>>> a) writing/using coders which do not keep instances of evolving
>>> classes
>>> b) adding an interface to Beam for Coder
>>> serialization/deserialization
>>> c) adding a mode to Flink which allows to use newly supplied coders
>>>    instead of having to load the old coder
>>>
>>> From all the options (a) is the most feasible for you. It looks
>>> like
>>> neither ProtoCoder nor AvroCoder fall into this category.
>>>
>>> -Max
>>>
>>> On 04.06.20 16:22, Ivan San Jose wrote:
>>>> I've changed my Java model in order to use ProtoCoder (with
>>>> @DefaultCoder(ProtoCoder.class)), but I'm getting same error when
>>>> tryingto restore the last taken checkpoint after adding an
>>>> attribute to
>>>> that model.
>>>>
>>>> What do you think it could happen? It seems that state saved
>>>> within
>>>> the
>>>> checkpoint is still using Java serialization mechanism...
>>>>
>>>> On Thu, 2020-06-04 at 13:05 +0000, Ivan San Jose wrote:
>>>>> Thanks Max for your response. I'd try with AvroCoder then. But
>>>>> I
>>>>> still
>>>>> have a question, I guess AvroCoder is generating the AVRO
>>>>> schema
>>>>> using
>>>>> Java reflection, and then that generated schema is saved within
>>>>> the
>>>>> Flink checkpoint, right?
>>>>>
>>>>> On Wed, 2020-06-03 at 18:00 +0200, Maximilian Michels wrote:
>>>>>> Hi Ivan,
>>>>>>
>>>>>> Moving to the new type serializer snapshot interface is not
>>>>>> going
>>>>>> to
>>>>>> solve this problem because we cannot version the coder
>>>>>> through
>>>>>> the
>>>>>> Beam
>>>>>> coder interface. That is only possible through Flink.
>>>>>> However,
>>>>>> it
>>>>>> is
>>>>>> usually not trivial.
>>>>>>
>>>>>> In Beam, when you evolve your data model, the only way you
>>>>>> can
>>>>>> maintain
>>>>>> compatible is to use a serialization format which can evolve,
>>>>>> e.g.
>>>>>> KafkaIO or Protobuf.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On 03.06.20 16:47, Ivan San Jose wrote:
>>>>>>> Hi, we have a Beam application running with Flink runner
>>>>>>> and
>>>>>>> we
>>>>>>> are
>>>>>>> struggling using Flink checkpoints. Everytime we evolve the
>>>>>>> source
>>>>>>> code
>>>>>>> modifying a Java model, an exception is thrown when trying
>>>>>>> to
>>>>>>> restore
>>>>>>> last checkpoint taken:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalStateException: Could not Java-
>>>>>>> deserialize
>>>>>>> TypeSerializer while restoring checkpoint metadata for
>>>>>>> serializer
>>>>>>> snapshot
>>>>>>> 'org.apache.beam.runners.flink.translation.types.CoderTypeS
>>>>>>> er
>>>>>>> iali
>>>>>>> ze
>>>>>>> r$Le
>>>>>>> gacySnapshot'. Please update to the TypeSerializerSnapshot
>>>>>>> interface
>>>>>>> that removes Java Serialization to avoid this problem in
>>>>>>> the
>>>>>>> future.
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigS
>>>>>>> na
>>>>>>> psho
>>>>>>> t.
>>>>>>> rest
>>>>>>> oreSerializer(TypeSerializerConfigSnapshot.java:138)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.StateSerializerProvider.prev
>>>>>>> io
>>>>>>> usSc
>>>>>>> he
>>>>>>> maSe
>>>>>>> rializer(StateSerializerProvider.java:189)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.StateSerializerProvider.curr
>>>>>>> en
>>>>>>> tSch
>>>>>>> em
>>>>>>> aSer
>>>>>>> ializer(StateSerializerProvider.java:164)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.RegisteredOperatorStateBacke
>>>>>>> nd
>>>>>>> Meta
>>>>>>> In
>>>>>>> fo.g
>>>>>>> etPartitionStateSerializer(RegisteredOperatorStateBackendMe
>>>>>>> ta
>>>>>>> Info
>>>>>>> .j
>>>>>>> ava:
>>>>>>> 113)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperatio
>>>>>>> n.
>>>>>>> rest
>>>>>>> or
>>>>>>> e(Op
>>>>>>> eratorStateRestoreOperation.java:94)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendB
>>>>>>> ui
>>>>>>> lder
>>>>>>> .b
>>>>>>> uild
>>>>>>> (DefaultOperatorStateBackendBuilder.java:83)
>>>>>>> ... 12 more
>>>>>>> Caused by: java.io.InvalidClassException:
>>>>>>> internal.model.dimension.Dimension; local class
>>>>>>> incompatible:
>>>>>>> stream
>>>>>>> classdesc serialVersionUID = -223148029368332375, local
>>>>>>> class
>>>>>>> serialVersionUID = 4489864664852536553
>>>>>>>
>>>>>>> As you can see the exception is complaining about class was
>>>>>>> evolved
>>>>>>> and
>>>>>>> they are not compatible any more.
>>>>>>>
>>>>>>> After checking some documentation and Beam source code...
>>>>>>> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>>>>>>> (Serializers
>>>>>>> vs
>>>>>>> Coders)
>>>>>>> https://github.com/apache/beam/blob/785609f22d013411b7973bbf9e2d15c3c8171fb2/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
>>>>>>>
>>>>>>> It seems that Beam coders are wrapped into Flink's
>>>>>>> TypeSerializers,
>>>>>>> and, at the end, Beam coders are the ones in charge of
>>>>>>> serialize/deserialize objects.
>>>>>>>
>>>>>>> Also reading
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17
>>>>>>> , and seeing CoderTyperSerializer implementation it seems
>>>>>>> Beam is
>>>>>>> implementing a deprecated interface and should be moved to
>>>>>>> new
>>>>>>> TypeSerializerSnapshot one. To be honest I don't know if
>>>>>>> that
>>>>>>> would
>>>>>>> solve my problem, but the exception is clearly saying
>>>>>>> "Please
>>>>>>> update to
>>>>>>> the TypeSerializerSnapshot interface that removes Java
>>>>>>> Serialization to
>>>>>>> avoid this problem in the future.", so I gues would solve
>>>>>>> it.
>>>>>>>
>>>>>>> Can someone help me here? I could try to update the
>>>>>>> implementation
>>>>>>> if
>>>>>>> someone give me some hints, because right now I'm a little
>>>>>>> bit
>>>>>>> lost.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> Este correo electrónico y sus adjuntos son de naturaleza
>>>>>>> confidencial. A no ser que usted sea el destinatario, no
>>>>>>> puede
>>>>>>> utilizar, copiar o desvelar tanto el mensaje como cualquier
>>>>>>> información contenida en el mensaje. Si no es el
>>>>>>> destinatario,
>>>>>>> debe
>>>>>>> borrar este correo y notificar al remitente inmediatamente.
>>>>>>> Cualquier punto de vista u opinión expresada en este correo
>>>>>>> electrónico son únicamente del remitente, a no ser que se
>>>>>>> indique
>>>>>>> lo contrario. Todos los derechos de autor en cualquier
>>>>>>> material
>>>>>>> de
>>>>>>> este correo son reservados. Todos los correos electrónicos,
>>>>>>> salientes o entrantes, pueden ser grabados y monitorizados
>>>>>>> para
>>>>>>> uso
>>>>>>> legítimo del negocio. Nos encontramos exentos de toda
>>>>>>> responsabilidad ante cualquier perdida o daño que surja o
>>>>>>> resulte
>>>>>>> de la recepción, uso o transmisión de este correo
>>>>>>> electrónico
>>>>>>> hasta
>>>>>>> el máximo permitido por la ley.
>>>>>>>
>>>>>>> This email and any attachment to it are confidential.
>>>>>>> Unless
>>>>>>> you
>>>>>>> are the intended recipient, you may not use, copy or
>>>>>>> disclose
>>>>>>> either the message or any information contained in the
>>>>>>> message.
>>>>>>> If
>>>>>>> you are not the intended recipient, you should delete this
>>>>>>> email
>>>>>>> and notify the sender immediately. Any views or opinions
>>>>>>> expressed
>>>>>>> in this email are those of the sender only, unless
>>>>>>> otherwise
>>>>>>> stated. All copyright in any of the material in this email
>>>>>>> is
>>>>>>> reserved. All emails, incoming and outgoing, may be
>>>>>>> recorded
>>>>>>> and
>>>>>>> monitored for legitimate business purposes. We exclude all
>>>>>>> liability for any loss or damage arising or resulting from
>>>>>>> the
>>>>>>> receipt, use or transmission of this email to the fullest
>>>>>>> extent
>>>>>>> permitted by law.
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> This message has been scanned and is believed to be clean.
>>>>>>
>>>>>
>>>>> Este correo electrónico y sus adjuntos son de naturaleza
>>>>> confidencial. A no ser que usted sea el destinatario, no puede
>>>>> utilizar, copiar o desvelar tanto el mensaje como cualquier
>>>>> información contenida en el mensaje. Si no es el destinatario,
>>>>> debe
>>>>> borrar este correo y notificar al remitente inmediatamente.
>>>>> Cualquier
>>>>> punto de vista u opinión expresada en este correo electrónico
>>>>> son
>>>>> únicamente del remitente, a no ser que se indique lo contrario.
>>>>> Todos
>>>>> los derechos de autor en cualquier material de este correo son
>>>>> reservados. Todos los correos electrónicos, salientes o
>>>>> entrantes,
>>>>> pueden ser grabados y monitorizados para uso legítimo del
>>>>> negocio.
>>>>> Nos encontramos exentos de toda responsabilidad ante cualquier
>>>>> perdida o daño que surja o resulte de la recepción, uso o
>>>>> transmisión
>>>>> de este correo electrónico hasta el máximo permitido por la
>>>>> ley.
>>>>>
>>>>> This email and any attachment to it are confidential. Unless
>>>>> you
>>>>> are
>>>>> the intended recipient, you may not use, copy or disclose
>>>>> either
>>>>> the
>>>>> message or any information contained in the message. If you are
>>>>> not
>>>>> the intended recipient, you should delete this email and notify
>>>>> the
>>>>> sender immediately. Any views or opinions expressed in this
>>>>> email
>>>>> are
>>>>> those of the sender only, unless otherwise stated. All
>>>>> copyright
>>>>> in
>>>>> any of the material in this email is reserved. All emails,
>>>>> incoming
>>>>> and outgoing, may be recorded and monitored for legitimate
>>>>> business
>>>>> purposes. We exclude all liability for any loss or damage
>>>>> arising
>>>>> or
>>>>> resulting from the receipt, use or transmission of this email
>>>>> to
>>>>> the
>>>>> fullest extent permitted by law.
>>>>
>>>> Este correo electrónico y sus adjuntos son de naturaleza
>>>> confidencial. A no ser que usted sea el destinatario, no puede
>>>> utilizar, copiar o desvelar tanto el mensaje como cualquier
>>>> información contenida en el mensaje. Si no es el destinatario,
>>>> debe
>>>> borrar este correo y notificar al remitente inmediatamente.
>>>> Cualquier punto de vista u opinión expresada en este correo
>>>> electrónico son únicamente del remitente, a no ser que se indique
>>>> lo contrario. Todos los derechos de autor en cualquier material
>>>> de
>>>> este correo son reservados. Todos los correos electrónicos,
>>>> salientes o entrantes, pueden ser grabados y monitorizados para
>>>> uso
>>>> legítimo del negocio. Nos encontramos exentos de toda
>>>> responsabilidad ante cualquier perdida o daño que surja o resulte
>>>> de la recepción, uso o transmisión de este correo electrónico
>>>> hasta
>>>> el máximo permitido por la ley.
>>>>
>>>> This email and any attachment to it are confidential. Unless you
>>>> are the intended recipient, you may not use, copy or disclose
>>>> either the message or any information contained in the message.
>>>> If
>>>> you are not the intended recipient, you should delete this email
>>>> and notify the sender immediately. Any views or opinions
>>>> expressed
>>>> in this email are those of the sender only, unless otherwise
>>>> stated. All copyright in any of the material in this email is
>>>> reserved. All emails, incoming and outgoing, may be recorded and
>>>> monitored for legitimate business purposes. We exclude all
>>>> liability for any loss or damage arising or resulting from the
>>>> receipt, use or transmission of this email to the fullest extent
>>>> permitted by law.
>>>>
>>
>> Este correo electrónico y sus adjuntos son de naturaleza
>> confidencial. A no ser que usted sea el destinatario, no puede
>> utilizar, copiar o desvelar tanto el mensaje como cualquier
>> información contenida en el mensaje. Si no es el destinatario, debe
>> borrar este correo y notificar al remitente inmediatamente. Cualquier
>> punto de vista u opinión expresada en este correo electrónico son
>> únicamente del remitente, a no ser que se indique lo contrario. Todos
>> los derechos de autor en cualquier material de este correo son
>> reservados. Todos los correos electrónicos, salientes o entrantes,
>> pueden ser grabados y monitorizados para uso legítimo del negocio.
>> Nos encontramos exentos de toda responsabilidad ante cualquier
>> perdida o daño que surja o resulte de la recepción, uso o transmisión
>> de este correo electrónico hasta el máximo permitido por la ley.
>>
>> This email and any attachment to it are confidential. Unless you are
>> the intended recipient, you may not use, copy or disclose either the
>> message or any information contained in the message. If you are not
>> the intended recipient, you should delete this email and notify the
>> sender immediately. Any views or opinions expressed in this email are
>> those of the sender only, unless otherwise stated. All copyright in
>> any of the material in this email is reserved. All emails, incoming
>> and outgoing, may be recorded and monitored for legitimate business
>> purposes. We exclude all liability for any loss or damage arising or
>> resulting from the receipt, use or transmission of this email to the
>> fullest extent permitted by law.
> 
> 
> Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no 
> ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto 
> el mensaje como cualquier información contenida en el mensaje. Si no es el 
> destinatario, debe borrar este correo y notificar al remitente 
> inmediatamente. Cualquier punto de vista u opinión expresada en este correo 
> electrónico son únicamente del remitente, a no ser que se indique lo 
> contrario. Todos los derechos de autor en cualquier material de este correo 
> son reservados. Todos los correos electrónicos, salientes o entrantes, pueden 
> ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos 
> exentos de toda responsabilidad ante cualquier perdida o daño que surja o 
> resulte de la recepción, uso o transmisión de este correo electrónico hasta 
> el máximo permitido por la ley.
> 
> This email and any attachment to it are confidential. Unless you are the 
> intended recipient, you may not use, copy or disclose either the message or 
> any information contained in the message. If you are not the intended 
> recipient, you should delete this email and notify the sender immediately. 
> Any views or opinions expressed in this email are those of the sender only, 
> unless otherwise stated. All copyright in any of the material in this email 
> is reserved. All emails, incoming and outgoing, may be recorded and monitored 
> for legitimate business purposes. We exclude all liability for any loss or 
> damage arising or resulting from the receipt, use or transmission of this 
> email to the fullest extent permitted by law.
> 

Reply via email to