Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
Thanks Thias and Zakelly,

I probably muddied the waters saying that my use case was similar to
kvCache.
What I was calling "non serializable state" is actually a Random Cut Forest
ML model that cannot be serialized by itself, but you can extract a
serializable state. That is serializable, but definitely not a
primitive type.
To be specific, I am trying to implement a keyed version of this RCF
operator [1]. I need one RCF model (a separate "forest") per key. Key
cardinality is not very high, and the size of the state should not be
a problem.

I guess the only feasible way is what Zakelly is suggesting, using
reflection to extract and set the keyContext from within processElement().
I will explore this option.

Thanks again
Lorenzo

[1]
https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java


On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias 
wrote:

> Good morning all,
>
>
>
> Let me loop myself in …
>
>
>
>1. Another even more convenient way to enable cache is to actually
>configure/assign RocksDB to use more off-heap memory for cache, you also
>might consider enabling bloom filters  (all depends on how large you
>key-space is (thousands/millions/billions/…)
>
> Within the technological limits, RocksDB is hard to top, if keeping all
> data in memory is no option, this is the path I usually follow.
>
>1. The other question on how to control the current-key from within
>snapshot state: you can acquire a pointer to the underlying state backend
>e.g. from within open() and the get hold of a pointer of the specific state
>primitive, and set the current key directly.
>
> In order to find out how to do that, put a breakpoint in debugger and walk
> up a couple of call stack frames, and/or walk into the value setters and
> model after how it is done there.
>
> Mind though, to restore the current key, if you happen to change it to
> another key.
>
> Doing this e.g. in initializeState() is time-insensitive, because this
> happens outside the ‘hot’ code paths.
>
>1. If the number of elements to store is small, you can store it in
>operator state and initialize your local structure in initializeState()
>from it, you probably would want to keep the data in serialized form in
>operator state, since you mentioned, serialization would be expensive.
>2. There is another API (which I don’t remember the name of) that
>allows you to store operator state as BLOB directly if that would be a
>doable option for you.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Zakelly Lan 
> *Sent:* Wednesday, February 21, 2024 8:04 AM
> *To:* Lorenzo Nicora 
> *Cc:* Flink User Group 
> *Subject:* Re: Preparing keyed state before snapshot
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Lorenzo,
>
>
>
> I think the most convenient way is to modify the code of the state
> backend, adding a k-v cache as you want.
>
>
>
> Otherwise IIUC, there's no public interface to get keyContext. But well,
> you may try something hacky. You may use the passed-in `Context` instance
> in processElement, and leverage java reflection to get
> the KeyedProcessOperator instance, where you can perform setCurrentKey().
>
>
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
> wrote:
>
> Thanks Zakelly,
>
>
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
>
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
>
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
>
> How can I do something similar without modifying the Flink runtime?
>
>
>
> Lorenzo
>
>
>
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
> Hi Lorenzo,
>
>
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
>
>
> Hope this helps.
>
>
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
>
>
> Best,
>
> Zakelly
>
&g

RE: Preparing keyed state before snapshot

2024-02-21 Thread Schwalbe Matthias
Good morning all,

Let me loop myself in …


  1.  Another even more convenient way to enable cache is to actually 
configure/assign RocksDB to use more off-heap memory for cache, you also might 
consider enabling bloom filters  (all depends on how large you key-space is 
(thousands/millions/billions/…)
Within the technological limits, RocksDB is hard to top, if keeping all data in 
memory is no option, this is the path I usually follow.

  1.  The other question on how to control the current-key from within snapshot 
state: you can acquire a pointer to the underlying state backend e.g. from 
within open() and the get hold of a pointer of the specific state primitive, 
and set the current key directly.
In order to find out how to do that, put a breakpoint in debugger and walk up a 
couple of call stack frames, and/or walk into the value setters and model after 
how it is done there.
Mind though, to restore the current key, if you happen to change it to another 
key.
Doing this e.g. in initializeState() is time-insensitive, because this happens 
outside the ‘hot’ code paths.

  1.  If the number of elements to store is small, you can store it in operator 
state and initialize your local structure in initializeState() from it, you 
probably would want to keep the data in serialized form in operator state, 
since you mentioned, serialization would be expensive.
  2.  There is another API (which I don’t remember the name of) that allows you 
to store operator state as BLOB directly if that would be a doable option for 
you.

Sincere greetings

Thias




From: Zakelly Lan 
Sent: Wednesday, February 21, 2024 8:04 AM
To: Lorenzo Nicora 
Cc: Flink User Group 
Subject: Re: Preparing keyed state before snapshot

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend, 
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well, you 
may try something hacky. You may use the passed-in `Context` instance in 
processElement, and leverage java reflection to get the KeyedProcessOperator 
instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable 
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific 
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This 
method is not part of the API. I see it's set specifically for 
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an 
example in flink code (FastTop1Function#snapshotState) [1] of setting keys when 
snapshotState().

Hope this helps.

[1] 
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting the 
in-memory "state" of each key and putting it in the state with 
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of 
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora mailto:lorenzo.nic...@gmail.com>>
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group mailto:user@flink.apache.org>>
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to p

Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend,
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well,
you may try something hacky. You may use the passed-in `Context` instance
in processElement, and leverage java reflection to get
the KeyedProcessOperator instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
wrote:

> Thanks Zakelly,
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
> How can I do something similar without modifying the Flink runtime?
>
> Lorenzo
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
>> Hi Lorenzo,
>>
>> It is not recommended to do this with the keyed state. However there is
>> an example in flink code (FastTop1Function#snapshotState) [1] of setting
>> keys when snapshotState().
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>>
>> Best,
>> Zakelly
>>
>> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Thias
>>>
>>> I considered CheckpointedFunction.
>>> In snapshotState() I would have to update the state of each key,
>>> extracting the in-memory "state" of each key and putting it in the state
>>> with state.update(...) .
>>> This must happen per key,
>>> But snapshotState() has no visibility of the keys. And I have no way of
>>> selectively accessing the state of a specific key to update it.
>>> Unless I am missing something
>>>
>>> Thanks
>>> Lorenzo
>>>
>>>
>>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
 Good morning Lorenzo,



 You may want to implement
 org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
 your KeyedProcessFunction.

 Btw. By the time initializeState(…) is called, the state backend is
 fully initialized and can be read and written to (which is not the case for
 when the open(…) function is called.

 In initializeState(…) you also get access to state of different
 operator key.

 SnapshotState(…) is called as part of the (each) checkpoint in order to
 store data.



 Sincere greetings



 Thias



 *From:* Lorenzo Nicora 
 *Sent:* Thursday, February 15, 2024 7:50 PM
 *To:* Flink User Group 
 *Subject:* Preparing keyed state before snapshot



 Hello everyone,



 I have a convoluted problem.



 I am implementing a KeyedProcessFunction that keeps some
 non-serializable "state" in memory, in a transient Map (key = stream key,
 value = the non-serializable "state").



 I can extract a serializable representation to put in Flink state, and
 I can load my in-memory "state" from the Flink state. But these operations
 are expensive.



 Initializing the in-memory "state" is relatively easy. I do it lazily,
 in processElement(), on the first record for the key.



 The problem is saving the in-memory "state" to Flink state.

 I need to do it only before the state snapshot. But
 KeyedProcessFunction has no entrypoint called before the state snapshot.

 I cannot use CheckpointedFunction.snapshotState(), because it does not
 work for keyed state.



 Any suggestions?



 Note that I cannot use operator state nor a broadcast state.

 Processing is keyed. Every processed record modifies the in-memory
 "state" of that key. If the job rescale, the state of the key must follow
 the partition.





 Regards

 Lorenzo
 Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
 beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
 Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
 übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
 Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
 Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
 eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
 dieser Informationen ist streng verboten.

 This message is intended only for the named recipient and may contain
 confidential or privileged information. As the confidentiality of 

Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This
method is not part of the API. I see it's set specifically for
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:

> Hi Lorenzo,
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
> Best,
> Zakelly
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
> wrote:
>
>> Hi Thias
>>
>> I considered CheckpointedFunction.
>> In snapshotState() I would have to update the state of each key,
>> extracting the in-memory "state" of each key and putting it in the state
>> with state.update(...) .
>> This must happen per key,
>> But snapshotState() has no visibility of the keys. And I have no way of
>> selectively accessing the state of a specific key to update it.
>> Unless I am missing something
>>
>> Thanks
>> Lorenzo
>>
>>
>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Good morning Lorenzo,
>>>
>>>
>>>
>>> You may want to implement
>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>> your KeyedProcessFunction.
>>>
>>> Btw. By the time initializeState(…) is called, the state backend is
>>> fully initialized and can be read and written to (which is not the case for
>>> when the open(…) function is called.
>>>
>>> In initializeState(…) you also get access to state of different operator
>>> key.
>>>
>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>> store data.
>>>
>>>
>>>
>>> Sincere greetings
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Lorenzo Nicora 
>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>> *To:* Flink User Group 
>>> *Subject:* Preparing keyed state before snapshot
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>>
>>>
>>> I have a convoluted problem.
>>>
>>>
>>>
>>> I am implementing a KeyedProcessFunction that keeps some
>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>> value = the non-serializable "state").
>>>
>>>
>>>
>>> I can extract a serializable representation to put in Flink state, and I
>>> can load my in-memory "state" from the Flink state. But these operations
>>> are expensive.
>>>
>>>
>>>
>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>> in processElement(), on the first record for the key.
>>>
>>>
>>>
>>> The problem is saving the in-memory "state" to Flink state.
>>>
>>> I need to do it only before the state snapshot. But KeyedProcessFunction
>>> has no entrypoint called before the state snapshot.
>>>
>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>> work for keyed state.
>>>
>>>
>>>
>>> Any suggestions?
>>>
>>>
>>>
>>> Note that I cannot use operator state nor a broadcast state.
>>>
>>> Processing is keyed. Every processed record modifies the in-memory
>>> "state" of that key. If the job rescale, the state of the key must follow
>>> the partition.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Lorenzo
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>


Re: Preparing keyed state before snapshot

2024-02-17 Thread Zakelly Lan
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an
example in flink code (FastTop1Function#snapshotState) [1] of setting keys
when snapshotState().

Hope this helps.

[1]
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
wrote:

> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am missing something
>
> Thanks
> Lorenzo
>
>
> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Good morning Lorenzo,
>>
>>
>>
>> You may want to implement
>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>> your KeyedProcessFunction.
>>
>> Btw. By the time initializeState(…) is called, the state backend is fully
>> initialized and can be read and written to (which is not the case for when
>> the open(…) function is called.
>>
>> In initializeState(…) you also get access to state of different operator
>> key.
>>
>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>> store data.
>>
>>
>>
>> Sincere greetings
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Lorenzo Nicora 
>> *Sent:* Thursday, February 15, 2024 7:50 PM
>> *To:* Flink User Group 
>> *Subject:* Preparing keyed state before snapshot
>>
>>
>>
>> Hello everyone,
>>
>>
>>
>> I have a convoluted problem.
>>
>>
>>
>> I am implementing a KeyedProcessFunction that keeps some non-serializable
>> "state" in memory, in a transient Map (key = stream key, value = the
>> non-serializable "state").
>>
>>
>>
>> I can extract a serializable representation to put in Flink state, and I
>> can load my in-memory "state" from the Flink state. But these operations
>> are expensive.
>>
>>
>>
>> Initializing the in-memory "state" is relatively easy. I do it lazily, in
>> processElement(), on the first record for the key.
>>
>>
>>
>> The problem is saving the in-memory "state" to Flink state.
>>
>> I need to do it only before the state snapshot. But KeyedProcessFunction
>> has no entrypoint called before the state snapshot.
>>
>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>> work for keyed state.
>>
>>
>>
>> Any suggestions?
>>
>>
>>
>> Note that I cannot use operator state nor a broadcast state.
>>
>> Processing is keyed. Every processed record modifies the in-memory
>> "state" of that key. If the job rescale, the state of the key must follow
>> the partition.
>>
>>
>>
>>
>>
>> Regards
>>
>> Lorenzo
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: Preparing keyed state before snapshot

2024-02-16 Thread Lorenzo Nicora
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting
the in-memory "state" of each key and putting it in the state with
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
wrote:

> Good morning Lorenzo,
>
>
>
> You may want to implement
> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
> your KeyedProcessFunction.
>
> Btw. By the time initializeState(…) is called, the state backend is fully
> initialized and can be read and written to (which is not the case for when
> the open(…) function is called.
>
> In initializeState(…) you also get access to state of different operator
> key.
>
> SnapshotState(…) is called as part of the (each) checkpoint in order to
> store data.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
> *From:* Lorenzo Nicora 
> *Sent:* Thursday, February 15, 2024 7:50 PM
> *To:* Flink User Group 
> *Subject:* Preparing keyed state before snapshot
>
>
>
> Hello everyone,
>
>
>
> I have a convoluted problem.
>
>
>
> I am implementing a KeyedProcessFunction that keeps some non-serializable
> "state" in memory, in a transient Map (key = stream key, value = the
> non-serializable "state").
>
>
>
> I can extract a serializable representation to put in Flink state, and I
> can load my in-memory "state" from the Flink state. But these operations
> are expensive.
>
>
>
> Initializing the in-memory "state" is relatively easy. I do it lazily, in
> processElement(), on the first record for the key.
>
>
>
> The problem is saving the in-memory "state" to Flink state.
>
> I need to do it only before the state snapshot. But KeyedProcessFunction
> has no entrypoint called before the state snapshot.
>
> I cannot use CheckpointedFunction.snapshotState(), because it does not
> work for keyed state.
>
>
>
> Any suggestions?
>
>
>
> Note that I cannot use operator state nor a broadcast state.
>
> Processing is keyed. Every processed record modifies the in-memory "state"
> of that key. If the job rescale, the state of the key must follow the
> partition.
>
>
>
>
>
> Regards
>
> Lorenzo
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Preparing keyed state before snapshot

2024-02-15 Thread Schwalbe Matthias
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora 
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group 
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I can 
load my in-memory "state" from the Flink state. But these operations are 
expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in 
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction has no 
entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work for 
keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state" of 
that key. If the job rescale, the state of the key must follow the partition.


Regards
Lorenzo
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.