Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora <lorenzo.nic...@gmail.com>
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group <user@flink.apache.org>
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I can 
load my in-memory "state" from the Flink state. But these operations are 
expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in 
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction has no 
entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work for 
keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state" of 
that key. If the job rescale, the state of the key must follow the partition.


Regards
Lorenzo
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to