Hello all,

I have posted this on the Flink group but posting it here because we write
our jobs in Beam and use the Flink Runner and I don't fully understand the
interactions between the Beam Coders and Flink Type Serializers

 We are seeing the following error when restarting the job from a Savepoint

Caused by: java.io.InvalidClassException: com.xxx.xxx; local class
> incompatible: stream classdesc serialVersionUID = -5544933308624767500,
> local class serialVersionUID = -7822035950742261370


Here is what happened

   - The Type in question is an Avro Type - so we have a
   *`PCollection<OurAvroType*>` in the job.
   - We updated the Avro schema and by default the generated class will
   have a new serialVersionUID in Avro (the serialVersionUIDs above line up
   with the ones in the generated Avro classes)
   - We did not use any custom serializers for this type so I believe it
   would have been covered by Flink's POJO serializer (through Beam) and that
   is breaking because of the serialVersionUID change.


I am wondering how to work around this without losing my savepoint. We are
going to try the following way and was wondering if the community had any
suggestions

   - Add flink-avro into the job jar as mentioned in
   
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro.
   I am not sure this would work because the original bytes were written out
   by the POJO serializer and that is probably going to be used for
   deserialization? There must be some record of which serializer wrote out
   the bytes and I am not sure how to override that
   - I also wanted to make sure for future use cases that including the
   flink-avro jar on the classpath will only affect Avro types by default
   - Specifically in the context of Beam, will the first bullet point ever
   work? Will Beam detect the jar on the classpath and use the Avro serializer
   like Flink (or probably Flink does a scan of the classes after Beam
   converts my code into a Flink Topology) or will I have to explicitly
   specify using the `*AvroCoder*`

Thanks,
Yash

Reply via email to