Ok, sorry but now I'm totally lost... Then, as you said "Beam coders are wrapped in Flink's TypeSerializers", so Beam is not doing anything about serialization/deserialization, is leveraging this to Flink TypeSerializers, right?
Now let me explain my problem better... We have a Beam pipeline using Flink as runner, we've enabled checkpointing and added a field (with a default value) to a Java model class called "internal.boot.hbase.HBaseRow". Then, after restarting the pipeline with that change, following error was thrown: Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build (DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperator StateBackend(FsStateBackend.java:544) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem ptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat eAndRestore(BackendRestorerProcedure.java:121) ... 8 more Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$Le gacySnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future. at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.rest oreSerializer(TypeSerializerConfigSnapshot.java:138) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSe rializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSer ializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.g etPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java: 113) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(Op eratorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build (DefaultOperatorStateBackendBuilder.java:83) ... 12 more Caused by: java.io.InvalidClassException: internal.boot.hbase.HBaseRow; local class incompatible: stream classdesc serialVersionUID = 3720984101010230366, local class serialVersionUID = 5344777628021899455 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286 ) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206 8) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286 ) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206 8) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286 ) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206 8) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$T ypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.ja va:301) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.t ryReadSerializer(TypeSerializerSerializationUtil.java:116) at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.read Snapshot(TypeSerializerConfigSnapshot.java:113) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersio nedSnapshot(TypeSerializerSnapshot.java:174) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati onUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSeria lizerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati onUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnap shotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati onUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.j ava:76) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWri ters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotR eadersWriters.java:219) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(O peratorBackendSerializationProxy.java:119) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(Op eratorStateRestoreOperation.java:83) ... 13 more I guess is because state saved in the checkpoint couldn't be restored, and the error is saying that couldn't be restored due to change done in internal.boot.hbase.HBaseRow class . Also the error is suggesting to use TypeSerializerSnapshot to get rid of Java serialization. So, then, who is using Java serialization here? Also, I've found following classes that I guess are doing the job about wrapping Beam coders into FLink TypeSerializer: https://github.com/apache/beam/blob/785609f22d013411b7973bbf9e2d15c3c8171fb2/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java But. honestly speaking, I don't understand in which cases one or the other is used. Thanks On Tue, 2020-05-19 at 16:01 +0200, Maximilian Michels wrote: > Hi Ivan, > > Beam does not use Java serialization for checkpoint data. It uses > Beam > coders which are wrapped in Flink's TypeSerializers. That said, Beam > does not support serializer migration yet. > > I'm curious, what do you consider a "backwards-compatible" change? If > you are attempting to upgrade the snapshot from one Beam version to > another, there is a high change this won't work due to coder changes > introduced between two Beam releases. > > Best, > Max > > PS: > This blog post might shed some more light on the matter: > https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html > > On 19.05.20 13:45, Ivan San Jose wrote: > > Hi, I've been started to use Apache Beam not so long ago (so bear > > with > > me please) and I have a question about Beam coders and how are they > > related with under the hood runner serializer... Let me explain > > myself > > better: > > > > As far I've understood from Beam documentation, coders are being > > used > > in order to describe how PCollection elements are going to be > > serialized/deserialized when needed. Is that correct? > > > > In the other hand, we have the runner, which is executing the > > application under the hood, and needs to send elements among > > different > > workers, drivers, whatever other thing... And it seems is using its > > proper serializer, not Beam coder defined for that type. Is that > > correct? > > > > I'm asking this because, in our case, we are using Flink, and we > > are > > having issues trying to restore checkpoints because it seems to be > > using standard Java serialization. > > > > So, first thing we would like to know is how coders are related > > with > > runner's serializer.. Is Beam serializing elements and then the > > runner > > is serializing again the result or what? > > > > And the other question is how we could get rid of Java serializer > > when > > using Flink as runner, because we are getting following error when > > trying to restore a checkpoint with backward compatible changes: > > > > Could not Java-deserialize TypeSerializer while restoring > > checkpoint > > metadata for serializer snapshot > > 'org.apache.beam.runners.flink.translation.types.CoderTypeSerialize > > r$Le > > gacySnapshot'. Please update to the TypeSerializerSnapshot > > interface > > that removes Java Serialization to avoid this problem in the > > future. > > > > But we don't know how to tell Beam to tell Flink to use > > TypeSerializerSnapshot as serializer, to be honest. > > > > Thank you. Regards > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > confidencial. A no ser que usted sea el destinatario, no puede > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > información contenida en el mensaje. Si no es el destinatario, debe > > borrar este correo y notificar al remitente inmediatamente. > > Cualquier punto de vista u opinión expresada en este correo > > electrónico son únicamente del remitente, a no ser que se indique > > lo contrario. Todos los derechos de autor en cualquier material de > > este correo son reservados. Todos los correos electrónicos, > > salientes o entrantes, pueden ser grabados y monitorizados para uso > > legítimo del negocio. Nos encontramos exentos de toda > > responsabilidad ante cualquier perdida o daño que surja o resulte > > de la recepción, uso o transmisión de este correo electrónico hasta > > el máximo permitido por la ley. > > > > This email and any attachment to it are confidential. Unless you > > are the intended recipient, you may not use, copy or disclose > > either the message or any information contained in the message. If > > you are not the intended recipient, you should delete this email > > and notify the sender immediately. Any views or opinions expressed > > in this email are those of the sender only, unless otherwise > > stated. All copyright in any of the material in this email is > > reserved. All emails, incoming and outgoing, may be recorded and > > monitored for legitimate business purposes. We exclude all > > liability for any loss or damage arising or resulting from the > > receipt, use or transmission of this email to the fullest extent > > permitted by law. > > > > -- > This message has been scanned and is believed to be clean. > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
