Hi Petter,

which state backend are you using in your case? I think there is no quick solution for your problem because a proper schema evolution story is on the roadmap for Flink 1.6.

Would it work to change the serial version id of the generated Avro class as a temporary workaround?

Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
Thank you. Maybe we already identified the issue (see https://issues.apache.org/jira/browse/FLINK-9202). I will use your code to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Petter,

    could you share the source code of the class that Avro generates
    out of this schema?

    Thank you.

    Regards,
    Timo

    Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
    Hello everyone,

    I am trying to figure out how to set up Flink with Avro for
    state management (especially the content of snapshots) to enable
    state migrations (e.g. adding a nullable fields to a class). So
    far, in version 1.4.2, I tried to explicitly provide an instance
    of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
    is a very simple Avro generated SpecificRecordBase of the
    following schema:

    {"namespace": "io.relayr.flink",
     "type": "record",
     "name": "Accumulator",
     "fields": [
         {"name": "accumulator", "type": "int"}
     ]
    }

    This successfully saves the state to the snapshot. When I then
    try to load the snapshot with an updated schema (adding the
    nullable field) it fails. Schema looks like this:

    {"namespace": "io.relayr.flink",
     "type": "record",
     "name": "Accumulator",
     "fields": [
         {"name": "accumulator", "type": "int"},
         {"name": "newStuff", "type": ["int", "null"]}
     ]
    }

    When I try to restart the Job from the snapshot, I get the
    following exception:
    2018-04-17 09:35:23,519 WARN
    org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
    - Deserialization of serializer errored; replacing with null.
    java.io.IOException: Unloadable class for type serializer.
    ...
    Caused by: java.io.InvalidClassException:
    io.relayr.flink.Accumulator; local class incompatible: stream
    classdesc serialVersionUID = -3555733236161157838, local class
    serialVersionUID = 5291033088112484292

    Which is true, Avro tools do generate a new serialization ID for
    the bean, I just didn't expect it to be used and expected the
    Avro schema to be used instead? Did anyone get this working?
    What am I getting wrong?

    Best regards,
    Petter





Reply via email to