Thanks Thias and Zakelly, I probably muddied the waters saying that my use case was similar to kvCache. What I was calling "non serializable state" is actually a Random Cut Forest ML model that cannot be serialized by itself, but you can extract a serializable state. That is serializable, but definitely not a primitive type. To be specific, I am trying to implement a keyed version of this RCF operator [1]. I need one RCF model (a separate "forest") per key. Key cardinality is not very high, and the size of the state should not be a problem.
I guess the only feasible way is what Zakelly is suggesting, using reflection to extract and set the keyContext from within processElement(). I will explore this option. Thanks again Lorenzo [1] https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias <matthias.schwa...@viseca.ch> wrote: > Good morning all, > > > > Let me loop myself in … > > > > 1. Another even more convenient way to enable cache is to actually > configure/assign RocksDB to use more off-heap memory for cache, you also > might consider enabling bloom filters (all depends on how large you > key-space is (thousands/millions/billions/…) > > Within the technological limits, RocksDB is hard to top, if keeping all > data in memory is no option, this is the path I usually follow. > > 1. The other question on how to control the current-key from within > snapshot state: you can acquire a pointer to the underlying state backend > e.g. from within open() and the get hold of a pointer of the specific state > primitive, and set the current key directly. > > In order to find out how to do that, put a breakpoint in debugger and walk > up a couple of call stack frames, and/or walk into the value setters and > model after how it is done there. > > Mind though, to restore the current key, if you happen to change it to > another key. > > Doing this e.g. in initializeState() is time-insensitive, because this > happens outside the ‘hot’ code paths. > > 1. If the number of elements to store is small, you can store it in > operator state and initialize your local structure in initializeState() > from it, you probably would want to keep the data in serialized form in > operator state, since you mentioned, serialization would be expensive. > 2. There is another API (which I don’t remember the name of) that > allows you to store operator state as BLOB directly if that would be a > doable option for you. > > > > Sincere greetings > > > > Thias > > > > > > > > > > *From:* Zakelly Lan <zakelly....@gmail.com> > *Sent:* Wednesday, February 21, 2024 8:04 AM > *To:* Lorenzo Nicora <lorenzo.nic...@gmail.com> > *Cc:* Flink User Group <user@flink.apache.org> > *Subject:* Re: Preparing keyed state before snapshot > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Lorenzo, > > > > I think the most convenient way is to modify the code of the state > backend, adding a k-v cache as you want. > > > > Otherwise IIUC, there's no public interface to get keyContext. But well, > you may try something hacky. You may use the passed-in `Context` instance > in processElement, and leverage java reflection to get > the KeyedProcessOperator instance, where you can perform setCurrentKey(). > > > > > > Best, > > Zakelly > > > > On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > > Thanks Zakelly, > > > > I'd need to do something similar, with a map containing my > non-serializable "state", similar to the kvCache in FastTop1Fucntion. > > > > But I am not sure I understand how I can set the keyed state for a > specific key, in snapshotState(). > > FastTop1Function seems to rely on keyContext set via setKeyContext(). This > method is not part of the API. I see it's set specifically for > AbstractTopNFuction in StreamExecRank. > > How can I do something similar without modifying the Flink runtime? > > > > Lorenzo > > > > > > On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote: > > Hi Lorenzo, > > > > It is not recommended to do this with the keyed state. However there is an > example in flink code (FastTop1Function#snapshotState) [1] of setting keys > when snapshotState(). > > > > Hope this helps. > > > > [1] > https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 > > > > Best, > > Zakelly > > > > On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > > Hi Thias > > I considered CheckpointedFunction. > In snapshotState() I would have to update the state of each key, > extracting the in-memory "state" of each key and putting it in the state > with state.update(...) . > This must happen per key, > > But snapshotState() has no visibility of the keys. And I have no way of > selectively accessing the state of a specific key to update it. > Unless I am missing something > > > > Thanks > > Lorenzo > > > > > > On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias < > matthias.schwa...@viseca.ch> wrote: > > Good morning Lorenzo, > > > > You may want to implement > org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in > your KeyedProcessFunction. > > Btw. By the time initializeState(…) is called, the state backend is fully > initialized and can be read and written to (which is not the case for when > the open(…) function is called. > > In initializeState(…) you also get access to state of different operator > key. > > SnapshotState(…) is called as part of the (each) checkpoint in order to > store data. > > > > Sincere greetings > > > > Thias > > > > *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com> > *Sent:* Thursday, February 15, 2024 7:50 PM > *To:* Flink User Group <user@flink.apache.org> > *Subject:* Preparing keyed state before snapshot > > > > Hello everyone, > > > > I have a convoluted problem. > > > > I am implementing a KeyedProcessFunction that keeps some non-serializable > "state" in memory, in a transient Map (key = stream key, value = the > non-serializable "state"). > > > > I can extract a serializable representation to put in Flink state, and I > can load my in-memory "state" from the Flink state. But these operations > are expensive. > > > > Initializing the in-memory "state" is relatively easy. I do it lazily, in > processElement(), on the first record for the key. > > > > The problem is saving the in-memory "state" to Flink state. > > I need to do it only before the state snapshot. But KeyedProcessFunction > has no entrypoint called before the state snapshot. > > I cannot use CheckpointedFunction.snapshotState(), because it does not > work for keyed state. > > > > Any suggestions? > > > > Note that I cannot use operator state nor a broadcast state. > > Processing is keyed. Every processed record modifies the in-memory "state" > of that key. If the job rescale, the state of the key must follow the > partition. > > > > > > Regards > > Lorenzo > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >