Re: Managing state migrations with Flink and Avro

2018-04-23 Thread Petter Arvidsson
Hi Timo,

Thanks for your response. We are using the filesystem backend backed by S3.

We were looking for a good long term solution with Avro, so manually
changing the serial version id is probably not the right way to proceed for
us. I think we will wait for Flink1.6 before trying to properly implement
state migrations in this case.

Regards,
Petter

On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther  wrote:

> Hi Petter,
>
> which state backend are you using in your case? I think there is no quick
> solution for your problem because a proper schema evolution story is on the
> roadmap for Flink 1.6.
>
> Would it work to change the serial version id of the generated Avro class
> as a temporary workaround?
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:21 schrieb Timo Walther:
>
> Thank you. Maybe we already identified the issue (see
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your code
> to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther  wrote:
>
>> Hi Petter,
>>
>> could you share the source code of the class that Avro generates out of
>> this schema?
>>
>> Thank you.
>>
>> Regards,
>> Timo
>>
>> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>
>> Hello everyone,
>>
>> I am trying to figure out how to set up Flink with Avro for state
>> management (especially the content of snapshots) to enable state migrations
>> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
>> tried to explicitly provide an instance of "new
>> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
>> Avro generated SpecificRecordBase of the following schema:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>  {"name": "accumulator", "type": "int"}
>>  ]
>> }
>>
>> This successfully saves the state to the snapshot. When I then try to
>> load the snapshot with an updated schema (adding the nullable field) it
>> fails. Schema looks like this:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>  {"name": "accumulator", "type": "int"},
>>  {"name": "newStuff", "type": ["int", "null"]}
>>  ]
>> }
>>
>> When I try to restart the Job from the snapshot, I get the following
>> exception:
>> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
>> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
>> errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> ...
>> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
>> local class incompatible: stream classdesc serialVersionUID =
>> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>>
>> Which is true, Avro tools do generate a new serialization ID for the
>> bean, I just didn't expect it to be used and expected the Avro schema to be
>> used instead? Did anyone get this working? What am I getting wrong?
>>
>> Best regards,
>> Petter
>>
>>
>>
>
>
>


Re: Managing state migrations with Flink and Avro

2018-04-20 Thread Timo Walther

Hi Petter,

which state backend are you using in your case? I think there is no 
quick solution for your problem because a proper schema evolution story 
is on the roadmap for Flink 1.6.


Would it work to change the serial version id of the generated Avro 
class as a temporary workaround?


Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your 
code to verify it.


Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:

Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther > wrote:


Hi Petter,

could you share the source code of the class that Avro generates
out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:

Hello everyone,

I am trying to figure out how to set up Flink with Avro for
state management (especially the content of snapshots) to enable
state migrations (e.g. adding a nullable fields to a class). So
far, in version 1.4.2, I tried to explicitly provide an instance
of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
is a very simple Avro generated SpecificRecordBase of the
following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then
try to load the snapshot with an updated schema (adding the
nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"},
 {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the
following exception:
2018-04-17 09:35:23,519 WARN
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
- Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException:
io.relayr.flink.Accumulator; local class incompatible: stream
classdesc serialVersionUID = -3555733236161157838, local class
serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for
the bean, I just didn't expect it to be used and expected the
Avro schema to be used instead? Did anyone get this working?
What am I getting wrong?

Best regards,
Petter










Re: Managing state migrations with Flink and Avro

2018-04-18 Thread Timo Walther
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your code 
to verify it.


Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:

Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther > wrote:


Hi Petter,

could you share the source code of the class that Avro generates
out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:

Hello everyone,

I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state
migrations (e.g. adding a nullable fields to a class). So far, in
version 1.4.2, I tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then
try to load the snapshot with an updated schema (adding the
nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
 {"name": "accumulator", "type": "int"},
 {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the
following exception:
2018-04-17 09:35:23,519 WARN
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
- Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException:
io.relayr.flink.Accumulator; local class incompatible: stream
classdesc serialVersionUID = -3555733236161157838, local class
serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for
the bean, I just didn't expect it to be used and expected the
Avro schema to be used instead? Did anyone get this working? What
am I getting wrong?

Best regards,
Petter








Re: Managing state migrations with Flink and Avro

2018-04-18 Thread Petter Arvidsson
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther  wrote:

> Hi Petter,
>
> could you share the source code of the class that Avro generates out of
> this schema?
>
> Thank you.
>
> Regards,
> Timo
>
> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state
> management (especially the content of snapshots) to enable state migrations
> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
> tried to explicitly provide an instance of "new 
> AvroTypeInfo(Accumulator.getClass())"
> where accumulator is a very simple Avro generated SpecificRecordBase of the
> following schema:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>  {"name": "accumulator", "type": "int"}
>  ]
> }
>
> This successfully saves the state to the snapshot. When I then try to load
> the snapshot with an updated schema (adding the nullable field) it fails.
> Schema looks like this:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>  {"name": "accumulator", "type": "int"},
>  {"name": "newStuff", "type": ["int", "null"]}
>  ]
> }
>
> When I try to restart the Job from the snapshot, I get the following
> exception:
> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
> errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
> local class incompatible: stream classdesc serialVersionUID =
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the bean,
> I just didn't expect it to be used and expected the Avro schema to be used
> instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter
>
>
>
/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package io.relayr.flink;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Accumulator extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 5291033088112484292L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Accumulator\",\"namespace\":\"io.relayr.flink\",\"fields\":[{\"name\":\"accumulator\",\"type\":\"int\"},{\"name\":\"newStuff\",\"type\":[\"int\",\"null\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder ENCODER =
  new BinaryMessageEncoder(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder DECODER =
  new BinaryMessageDecoder(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   */
  public static BinaryMessageDecoder getDecoder() {
return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   */
  public static BinaryMessageDecoder createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
  }

  /** Serializes this Accumulator to a ByteBuffer. */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
  }

  /** Deserializes a Accumulator from a ByteBuffer. */
  public static Accumulator fromByteBuffer(
  java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
  }

  @Deprecated public int accumulator;
  @Deprecated public java.lang.Integer newStuff;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use newBuilder().
   */
  public Accumulator() {}

  /**
   * All-args constructor.
   * @param accumulator The new value for accumulator
   * @param newStuff The new value for newStuff
   */
  public Accumulator(java.lang.Integer accumulator, java.lang.Integer newStuff) {
this.accumulator = accumulator;
this.newStuff = newStuff;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public java.lang.Object get(int field$) {
switch (field$) {
case 0: return accumulator;
case 1: return