Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure 
made me realize I should have debugged the code and looked at the types. 
Apparently setting TTL changes the serializer, so I also had to add TTL in the 
WindowReaderFunction.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider 
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers 
(constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor<Long, List<String>> msd = new MapStateDescriptor<>(
>
>         "descriptorName",
>
>         TypeInformation.of(Long.class),
>
>         TypeInformation.of(new TypeHint<List<String>>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction<MyPojo, 
> Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState<Long, List<String>> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
>         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
>         .process(
>
>                 "my-uid",
>
>                 new StateReaderFunction(),
>
>                 Types.STRING,
>
>                 TypeInformation.of(MyPojo.class),
>
>                 Types.INT
>
>         )
>
>         .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>

Reply via email to