Thanks Zakelly, I'd need to do something similar, with a map containing my non-serializable "state", similar to the kvCache in FastTop1Fucntion.
But I am not sure I understand how I can set the keyed state for a specific key, in snapshotState(). FastTop1Function seems to rely on keyContext set via setKeyContext(). This method is not part of the API. I see it's set specifically for AbstractTopNFuction in StreamExecRank. How can I do something similar without modifying the Flink runtime? Lorenzo On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Lorenzo, > > It is not recommended to do this with the keyed state. However there is an > example in flink code (FastTop1Function#snapshotState) [1] of setting keys > when snapshotState(). > > Hope this helps. > > [1] > https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 > > Best, > Zakelly > > On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > >> Hi Thias >> >> I considered CheckpointedFunction. >> In snapshotState() I would have to update the state of each key, >> extracting the in-memory "state" of each key and putting it in the state >> with state.update(...) . >> This must happen per key, >> But snapshotState() has no visibility of the keys. And I have no way of >> selectively accessing the state of a specific key to update it. >> Unless I am missing something >> >> Thanks >> Lorenzo >> >> >> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias < >> matthias.schwa...@viseca.ch> wrote: >> >>> Good morning Lorenzo, >>> >>> >>> >>> You may want to implement >>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in >>> your KeyedProcessFunction. >>> >>> Btw. By the time initializeState(…) is called, the state backend is >>> fully initialized and can be read and written to (which is not the case for >>> when the open(…) function is called. >>> >>> In initializeState(…) you also get access to state of different operator >>> key. >>> >>> SnapshotState(…) is called as part of the (each) checkpoint in order to >>> store data. >>> >>> >>> >>> Sincere greetings >>> >>> >>> >>> Thias >>> >>> >>> >>> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com> >>> *Sent:* Thursday, February 15, 2024 7:50 PM >>> *To:* Flink User Group <user@flink.apache.org> >>> *Subject:* Preparing keyed state before snapshot >>> >>> >>> >>> Hello everyone, >>> >>> >>> >>> I have a convoluted problem. >>> >>> >>> >>> I am implementing a KeyedProcessFunction that keeps some >>> non-serializable "state" in memory, in a transient Map (key = stream key, >>> value = the non-serializable "state"). >>> >>> >>> >>> I can extract a serializable representation to put in Flink state, and I >>> can load my in-memory "state" from the Flink state. But these operations >>> are expensive. >>> >>> >>> >>> Initializing the in-memory "state" is relatively easy. I do it lazily, >>> in processElement(), on the first record for the key. >>> >>> >>> >>> The problem is saving the in-memory "state" to Flink state. >>> >>> I need to do it only before the state snapshot. But KeyedProcessFunction >>> has no entrypoint called before the state snapshot. >>> >>> I cannot use CheckpointedFunction.snapshotState(), because it does not >>> work for keyed state. >>> >>> >>> >>> Any suggestions? >>> >>> >>> >>> Note that I cannot use operator state nor a broadcast state. >>> >>> Processing is keyed. Every processed record modifies the in-memory >>> "state" of that key. If the job rescale, the state of the key must follow >>> the partition. >>> >>> >>> >>> >>> >>> Regards >>> >>> Lorenzo >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>> dieser Informationen ist streng verboten. >>> >>> This message is intended only for the named recipient and may contain >>> confidential or privileged information. As the confidentiality of email >>> communication cannot be guaranteed, we do not accept any responsibility for >>> the confidentiality and the intactness of this message. If you have >>> received it in error, please advise the sender by return e-mail and delete >>> this message and any attachments. Any unauthorised use or dissemination of >>> this information is strictly prohibited. >>> >>