Re: Error restoring Flink checkpoint

2020-06-23 Thread Maximilian Michels
> Yes, I agree that serializing coders into the checkpoint creates problems. > I'm wondering whether it is possible to serialize the coder URN + args > instead. I wish that was possible but Beam does not have a dedicated interface to snapshot serializers. It is not possible to restore serializer

Re: Error restoring Flink checkpoint

2020-06-22 Thread Ivan San Jose
I don't really know, my knowledge about Beam source code is not so deep, but I managed to modify AvroCoder in order to store a string containing class name (and class parameters in case it was a parametrized class) instead of references to Class. But, as I said, AvroCoder is using AVRO's ReflectDat

Re: Error restoring Flink checkpoint

2020-06-22 Thread Reuven Lax
Yes, I agree that serializing coders into the checkpoint creates problems. I'm wondering whether it is possible to serialize the coder URN + args instead. On Mon, Jun 22, 2020 at 11:00 PM Ivan San Jose wrote: > Hi again, just replying here in case this could be useful for someone > as using Flin

Re: Error restoring Flink checkpoint

2020-06-22 Thread Ivan San Jose
Hi again, just replying here in case this could be useful for someone as using Flink checkpoints on Beam is not realiable at all right now... Even I removed class references to the serialized object in AvroCoder, finally I couldn't make AvroCoder work as it is inferring schema using ReflectData cla

Re: Error restoring Flink checkpoint

2020-06-08 Thread Reuven Lax
Maybe instead of wapping the serialized Coder in the TypeSerializer, we could wrap the Coder URN instead? On Mon, Jun 8, 2020 at 7:37 AM Ivan San Jose wrote: > Hi Reuven, as far I've understood, Apache Beam coders are wrapped into > Flink's TypeSerializers, so they are being serialized as part o

Re: Error restoring Flink checkpoint

2020-06-08 Thread Ivan San Jose
Hi Reuven, as far I've understood, Apache Beam coders are wrapped into Flink's TypeSerializers, so they are being serialized as part of the chceckpoint according to https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapsho

Re: Error restoring Flink checkpoint

2020-06-08 Thread Ivan San Jose
This simple test breaks: import java.time.Instant; public class PacoTest { private static class Pojo { Instant instant; public Pojo() { this.instant = Instant.now(); } } @Test public void paco() throws IOException { Coder coder = AvroC

Re: Error restoring Flink checkpoint

2020-06-08 Thread Reuven Lax
Max, can you explain why Flink serializes the coders in the checkpoint? Dataflow on update uses the new graph, so doesn't hit this problem. On Mon, Jun 8, 2020 at 7:21 AM Ivan San Jose wrote: > Finally I've managed to modify Beam's AvroCoder in order not to > serialize any Class reference of the

Re: Error restoring Flink checkpoint

2020-06-08 Thread Ivan San Jose
Finally I've managed to modify Beam's AvroCoder in order not to serialize any Class reference of the object to be encoded/decoded, and could successfully restore a checkpoint after adding a field to the POJO model. I think it would be useful for everyone as current AvroCoder is not really useful wh

Re: Error restoring Flink checkpoint

2020-06-05 Thread Ivan San Jose
Thank you so much for your detailed answers Max, I will try to achieve what you've suggested about creating a custom coder which doesn't have non-transient fields refencening the serialized Java model. My skills with Beam are not so advanced though, but will try my best hehehe On Fri, 2020-06-05 a

Re: Error restoring Flink checkpoint

2020-06-05 Thread Maximilian Michels
See my answers inline. > Sorry but I'm afraid I'm not understanding well the scenario... What's > the point on keeping a reference of the serialized class if > AVRO/Protobuf are using schemas? They keep a reference to the class because they produce that type when they deserialize data. > I mean,

Re: Error restoring Flink checkpoint

2020-06-05 Thread Ivan San Jose
By the way, I've just tried using AvroCoder, which is inferring the schema from the Java object to be deserialized and I got same error when restoring the checkpoint :( On Fri, 2020-06-05 at 06:24 +, Ivan San Jose wrote: > Sorry but I'm afraid I'm not understanding well the scenario... > What'

Re: Error restoring Flink checkpoint

2020-06-04 Thread Ivan San Jose
Sorry but I'm afraid I'm not understanding well the scenario... What's the point on keeping a reference of the serialized class if AVRO/Protobuf are using schemas? I mean, I've understood that AVRO and Protobuf follow schemas in order to serialize/deserialize an object in a binary way. So I'd expe

Re: Error restoring Flink checkpoint

2020-06-04 Thread Reuven Lax
Could we try using schemas? On Thu, Jun 4, 2020 at 9:40 AM Maximilian Michels wrote: > I was under the assumption that this should work but ProtoCoder keeps a > reference of the class used to serialize. That causes the snapshot to > break. > > We can fix this by: > > a) writing/using coders whic

Re: Error restoring Flink checkpoint

2020-06-04 Thread Maximilian Michels
I was under the assumption that this should work but ProtoCoder keeps a reference of the class used to serialize. That causes the snapshot to break. We can fix this by: a) writing/using coders which do not keep instances of evolving classes b) adding an interface to Beam for Coder serialization/d

Re: Error restoring Flink checkpoint

2020-06-04 Thread Maximilian Michels
I think Avro has two modes of operation. Either you supply a schema upfront or you let it auto-extract from a given type. I don't know how the auto-extraction works but if you specify the schema and version in your Beam Avro coder, then Avro will automatically resolve the checkpoint data against t

Re: Error restoring Flink checkpoint

2020-06-04 Thread Ivan San Jose
I've changed my Java model in order to use ProtoCoder (with @DefaultCoder(ProtoCoder.class)), but I'm getting same error when tryingto restore the last taken checkpoint after adding an attribute to that model. What do you think it could happen? It seems that state saved within the checkpoint is st

Re: Error restoring Flink checkpoint

2020-06-04 Thread Ivan San Jose
Thanks Max for your response. I'd try with AvroCoder then. But I still have a question, I guess AvroCoder is generating the AVRO schema using Java reflection, and then that generated schema is saved within the Flink checkpoint, right? On Wed, 2020-06-03 at 18:00 +0200, Maximilian Michels wrote: >

Re: Error restoring Flink checkpoint

2020-06-03 Thread Maximilian Michels
> KafkaIO or Protobuf. *I meant to say "Avro or Protobuf". On 03.06.20 18:00, Maximilian Michels wrote: > Hi Ivan, > > Moving to the new type serializer snapshot interface is not going to > solve this problem because we cannot version the coder through the Beam > coder interface. That is only po

Re: Error restoring Flink checkpoint

2020-06-03 Thread Maximilian Michels
Hi Ivan, Moving to the new type serializer snapshot interface is not going to solve this problem because we cannot version the coder through the Beam coder interface. That is only possible through Flink. However, it is usually not trivial. In Beam, when you evolve your data model, the only way yo

Error restoring Flink checkpoint

2020-06-03 Thread Ivan San Jose
Hi, we have a Beam application running with Flink runner and we are struggling using Flink checkpoints. Everytime we evolve the source code modifying a Java model, an exception is thrown when trying to restore last checkpoint taken: Caused by: java.lang.IllegalStateException: Could not Java-deseri