Hi Thias I considered CheckpointedFunction. In snapshotState() I would have to update the state of each key, extracting the in-memory "state" of each key and putting it in the state with state.update(...) . This must happen per key, But snapshotState() has no visibility of the keys. And I have no way of selectively accessing the state of a specific key to update it. Unless I am missing something
Thanks Lorenzo On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <matthias.schwa...@viseca.ch> wrote: > Good morning Lorenzo, > > > > You may want to implement > org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in > your KeyedProcessFunction. > > Btw. By the time initializeState(…) is called, the state backend is fully > initialized and can be read and written to (which is not the case for when > the open(…) function is called. > > In initializeState(…) you also get access to state of different operator > key. > > SnapshotState(…) is called as part of the (each) checkpoint in order to > store data. > > > > Sincere greetings > > > > Thias > > > > *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com> > *Sent:* Thursday, February 15, 2024 7:50 PM > *To:* Flink User Group <user@flink.apache.org> > *Subject:* Preparing keyed state before snapshot > > > > Hello everyone, > > > > I have a convoluted problem. > > > > I am implementing a KeyedProcessFunction that keeps some non-serializable > "state" in memory, in a transient Map (key = stream key, value = the > non-serializable "state"). > > > > I can extract a serializable representation to put in Flink state, and I > can load my in-memory "state" from the Flink state. But these operations > are expensive. > > > > Initializing the in-memory "state" is relatively easy. I do it lazily, in > processElement(), on the first record for the key. > > > > The problem is saving the in-memory "state" to Flink state. > > I need to do it only before the state snapshot. But KeyedProcessFunction > has no entrypoint called before the state snapshot. > > I cannot use CheckpointedFunction.snapshotState(), because it does not > work for keyed state. > > > > Any suggestions? > > > > Note that I cannot use operator state nor a broadcast state. > > Processing is keyed. Every processed record modifies the in-memory "state" > of that key. If the job rescale, the state of the key must follow the > partition. > > > > > > Regards > > Lorenzo > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >