RE: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure 
made me realize I should have debugged the code and looked at the types. 
Apparently setting TTL changes the serializer, so I also had to add TTL in the 
WindowReaderFunction.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider 
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers 
(constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa 
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Roman Khachatryan
Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink
consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing
serializers (constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>