Hi Arvid,

Did you check out the most recent AvroSerializer code? 
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
 
<https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185>
 I think this does what you're suggesting.

Regarding the integration tests, if this is in fact the case it is not good and 
I would be very happy about a Jira Issue/PR there.

Regarding your last point, I think that the RockDB backend stores the metadata, 
which includes the type serialiser snapshot once, and not for all keys or key 
groups.

Best,
Aljoscha

> On 20. Feb 2018, at 11:40, Arvid Heise <arvid.he...@gmail.com> wrote:
> 
> Hi guys,
> 
> just wanted to write about that topic on my own.
> 
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
> 
> However, I discovered two issues on 1.3.2:
> 
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
> 
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
> 
> I can provide tests and patches if needed.
> 
> One related question:
> 
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with 
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> <Now we have two different schemas in the snapshots>
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
> 
> Thanks for any feedback
> 
> Arvid
> 
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenis...@gmail.com> 
> wrote:
>> Hi Till,
>> 
>> Thanks for the quick reply, I'm using 1.3.2 atm.
>> 
>> Cheers,
>> Niels
>> 
>> On Feb 19, 2018 19:10, "Till Rohrmann" <trohrm...@apache.org> wrote:
>>> 
>>> Hi Niels,
>>> 
>>> which version of Flink are you using? Currently, Flink does not support to
>>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>>> it will try to use the old serializer stored in the checkpoint stream to
>>> restore state.
>>> 
>>> I've pulled Gordon into the conversation who can tell you a little bit
>>> more about the current capability and limitations of state evolution.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>>> 
>>>> Hi all,
>>>> 
>>>> I'm currently trying to use Avro in order to evolve our data present in
>>>> Flink's Managed State. I've extended the TypeSerializer class
>>>> successfully
>>>> for this purpose, but still have issues using Schema Evolution.
>>>> 
>>>> *The problem:*
>>>> When we try to read data (deserialize from savepoint) with a new
>>>> serialiser
>>>> and a new schema, Flink seems to use the old schema of the old serializer
>>>> (written to the savepoint). This results in an old GenericRecord that
>>>> doesn't adhere to the new Avro schema.
>>>> 
>>>> *What seems to happen to me is the following* (Say we evolve from dataV1
>>>> to
>>>> dataV2):
>>>> - State containing dataV1 is serialized with avro schema V1 to a
>>>> check/savepoint. Along with the data, the serializer itself is written.
>>>> - Upon restore, the old serializer is retrieved from the data (therefore
>>>> needs to be on the classpath). Data is restored using this old
>>>> serializer.
>>>> The new serializer provided is only used for writes.
>>>> 
>>>> If this is indeed the case it explains our aforementioned problem. If you
>>>> have any pointers as to whether this is true and what a possible solution
>>>> would be that would be very much appreciated!
>>>> 
>>>> Thanks!
>>>> Niels
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> 
>>> 
>>> 
>>> 
>>> ________________________________
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>>> NAML

Reply via email to