Hi Timo, Thanks for your response. We are using the filesystem backend backed by S3.
We were looking for a good long term solution with Avro, so manually changing the serial version id is probably not the right way to proceed for us. I think we will wait for Flink1.6 before trying to properly implement state migrations in this case. Regards, Petter On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <twal...@apache.org> wrote: > Hi Petter, > > which state backend are you using in your case? I think there is no quick > solution for your problem because a proper schema evolution story is on the > roadmap for Flink 1.6. > > Would it work to change the serial version id of the generated Avro class > as a temporary workaround? > > Regards, > Timo > > > Am 18.04.18 um 14:21 schrieb Timo Walther: > > Thank you. Maybe we already identified the issue (see > https://issues.apache.org/jira/browse/FLINK-9202). I will use your code > to verify it. > > Regards, > Timo > > > Am 18.04.18 um 14:07 schrieb Petter Arvidsson: > > Hi Timo, > > Please find the generated class (for the second schema) attached. > > Regards, > Petter > > On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twal...@apache.org> wrote: > >> Hi Petter, >> >> could you share the source code of the class that Avro generates out of >> this schema? >> >> Thank you. >> >> Regards, >> Timo >> >> Am 18.04.18 um 11:00 schrieb Petter Arvidsson: >> >> Hello everyone, >> >> I am trying to figure out how to set up Flink with Avro for state >> management (especially the content of snapshots) to enable state migrations >> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I >> tried to explicitly provide an instance of "new >> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple >> Avro generated SpecificRecordBase of the following schema: >> >> {"namespace": "io.relayr.flink", >> "type": "record", >> "name": "Accumulator", >> "fields": [ >> {"name": "accumulator", "type": "int"} >> ] >> } >> >> This successfully saves the state to the snapshot. When I then try to >> load the snapshot with an updated schema (adding the nullable field) it >> fails. Schema looks like this: >> >> {"namespace": "io.relayr.flink", >> "type": "record", >> "name": "Accumulator", >> "fields": [ >> {"name": "accumulator", "type": "int"}, >> {"name": "newStuff", "type": ["int", "null"]} >> ] >> } >> >> When I try to restart the Job from the snapshot, I get the following >> exception: >> 2018-04-17 09:35:23,519 WARN org.apache.flink.api.common.ty >> peutils.TypeSerializerSerializationUtil - Deserialization of serializer >> errored; replacing with null. >> java.io.IOException: Unloadable class for type serializer. >> ... >> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; >> local class incompatible: stream classdesc serialVersionUID = >> -3555733236161157838, local class serialVersionUID = 5291033088112484292 >> >> Which is true, Avro tools do generate a new serialization ID for the >> bean, I just didn't expect it to be used and expected the Avro schema to be >> used instead? Did anyone get this working? What am I getting wrong? >> >> Best regards, >> Petter >> >> >> > > >