Good morning Lorenzo, You may want to implement org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in your KeyedProcessFunction. Btw. By the time initializeState(…) is called, the state backend is fully initialized and can be read and written to (which is not the case for when the open(…) function is called. In initializeState(…) you also get access to state of different operator key. SnapshotState(…) is called as part of the (each) checkpoint in order to store data.
Sincere greetings Thias From: Lorenzo Nicora <lorenzo.nic...@gmail.com> Sent: Thursday, February 15, 2024 7:50 PM To: Flink User Group <user@flink.apache.org> Subject: Preparing keyed state before snapshot Hello everyone, I have a convoluted problem. I am implementing a KeyedProcessFunction that keeps some non-serializable "state" in memory, in a transient Map (key = stream key, value = the non-serializable "state"). I can extract a serializable representation to put in Flink state, and I can load my in-memory "state" from the Flink state. But these operations are expensive. Initializing the in-memory "state" is relatively easy. I do it lazily, in processElement(), on the first record for the key. The problem is saving the in-memory "state" to Flink state. I need to do it only before the state snapshot. But KeyedProcessFunction has no entrypoint called before the state snapshot. I cannot use CheckpointedFunction.snapshotState(), because it does not work for keyed state. Any suggestions? Note that I cannot use operator state nor a broadcast state. Processing is keyed. Every processed record modifies the in-memory "state" of that key. If the job rescale, the state of the key must follow the partition. Regards Lorenzo Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.