Ok, sorry but now I'm totally lost...

Then, as you said "Beam coders are wrapped in Flink's TypeSerializers",
so Beam is not doing anything about serialization/deserialization, is
leveraging this to Flink TypeSerializers, right?

Now let me explain my problem better...

We have a Beam pipeline using Flink as runner, we've enabled
checkpointing and added a field (with a default value) to a Java model
class called "internal.boot.hbase.HBaseRow". Then, after restarting the
pipeline with that change, following error was thrown:

Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build
(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperator
StateBackend(FsStateBackend.java:544)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.lang.IllegalStateException: Could not Java-deserialize
TypeSerializer while restoring checkpoint metadata for serializer
snapshot
'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$Le
gacySnapshot'. Please update to the TypeSerializerSnapshot interface
that removes Java Serialization to avoid this problem in the future.
at
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.rest
oreSerializer(TypeSerializerConfigSnapshot.java:138)
at
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSe
rializer(StateSerializerProvider.java:189)
at
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSer
ializer(StateSerializerProvider.java:164)
at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.g
etPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:
113)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(Op
eratorStateRestoreOperation.java:94)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build
(DefaultOperatorStateBackendBuilder.java:83)
... 12 more
Caused by: java.io.InvalidClassException: internal.boot.hbase.HBaseRow;
local class incompatible: stream classdesc serialVersionUID =
3720984101010230366, local class serialVersionUID = 5344777628021899455
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at
java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$T
ypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.ja
va:301)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.t
ryReadSerializer(TypeSerializerSerializationUtil.java:116)
at
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.read
Snapshot(TypeSerializerConfigSnapshot.java:113)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersio
nedSnapshot(TypeSerializerSnapshot.java:174)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSeria
lizerSnapshotSerializationUtil.java:179)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnap
shotSerializationUtil.java:150)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.j
ava:76)
at
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWri
ters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotR
eadersWriters.java:219)
at
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(O
peratorBackendSerializationProxy.java:119)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(Op
eratorStateRestoreOperation.java:83)
... 13 more

I guess is because state saved in the checkpoint couldn't be restored,
and the error is saying that couldn't be restored due to change done in
internal.boot.hbase.HBaseRow class . Also the error is suggesting to
use TypeSerializerSnapshot to get rid of Java serialization.
So, then, who is using Java serialization here?


Also, I've found following classes that I guess are doing the job about
wrapping Beam coders into FLink TypeSerializer:
https://github.com/apache/beam/blob/785609f22d013411b7973bbf9e2d15c3c8171fb2/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java

But. honestly speaking, I don't understand in which cases one or the
other is used.

Thanks



On Tue, 2020-05-19 at 16:01 +0200, Maximilian Michels wrote:
> Hi Ivan,
>
> Beam does not use Java serialization for checkpoint data. It uses
> Beam
> coders which are wrapped in Flink's TypeSerializers. That said, Beam
> does not support serializer migration yet.
>
> I'm curious, what do you consider a "backwards-compatible" change? If
> you are attempting to upgrade the snapshot from one Beam version to
> another, there is a high change this won't work due to coder changes
> introduced between two Beam releases.
>
> Best,
> Max
>
> PS:
> This blog post might shed some more light on the matter:
> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>
> On 19.05.20 13:45, Ivan San Jose wrote:
> > Hi, I've been started to use Apache Beam not so long ago (so bear
> > with
> > me please) and I have a question about Beam coders and how are they
> > related with under the hood runner serializer... Let me explain
> > myself
> > better:
> >
> > As far I've understood from Beam documentation, coders are being
> > used
> > in order to describe how PCollection elements are going to be
> > serialized/deserialized when needed. Is that correct?
> >
> > In the other hand, we have the runner, which is executing the
> > application under the hood, and needs to send elements among
> > different
> > workers, drivers, whatever other thing... And it seems is using its
> > proper serializer, not Beam coder defined for that type. Is that
> > correct?
> >
> > I'm asking this because, in our case, we are using Flink, and we
> > are
> > having issues trying to restore checkpoints because it seems to be
> > using standard Java serialization.
> >
> > So, first thing we would like to know is how coders are related
> > with
> > runner's serializer.. Is Beam serializing elements and then the
> > runner
> > is serializing again the result or what?
> >
> > And the other question is how we could get rid of Java serializer
> > when
> > using Flink as runner, because we are getting following error when
> > trying to restore a checkpoint with backward compatible changes:
> >
> > Could not Java-deserialize TypeSerializer while restoring
> > checkpoint
> > metadata for serializer snapshot
> > 'org.apache.beam.runners.flink.translation.types.CoderTypeSerialize
> > r$Le
> > gacySnapshot'. Please update to the TypeSerializerSnapshot
> > interface
> > that removes Java Serialization to avoid this problem in the
> > future.
> >
> > But we don't know how to tell Beam to tell Flink to use
> > TypeSerializerSnapshot as serializer, to be honest.
> >
> > Thank you. Regards
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
> >
>
> --
> This message has been scanned and is believed to be clean.
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.

Reply via email to