Thanks Thias and Zakelly,

I probably muddied the waters saying that my use case was similar to
kvCache.
What I was calling "non serializable state" is actually a Random Cut Forest
ML model that cannot be serialized by itself, but you can extract a
serializable state. That is serializable, but definitely not a
primitive type.
To be specific, I am trying to implement a keyed version of this RCF
operator [1]. I need one RCF model (a separate "forest") per key. Key
cardinality is not very high, and the size of the state should not be
a problem.

I guess the only feasible way is what Zakelly is suggesting, using
reflection to extract and set the keyContext from within processElement().
I will explore this option.

Thanks again
Lorenzo

[1]
https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java


On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Good morning all,
>
>
>
> Let me loop myself in …
>
>
>
>    1. Another even more convenient way to enable cache is to actually
>    configure/assign RocksDB to use more off-heap memory for cache, you also
>    might consider enabling bloom filters  (all depends on how large you
>    key-space is (thousands/millions/billions/…)
>
> Within the technological limits, RocksDB is hard to top, if keeping all
> data in memory is no option, this is the path I usually follow.
>
>    1. The other question on how to control the current-key from within
>    snapshot state: you can acquire a pointer to the underlying state backend
>    e.g. from within open() and the get hold of a pointer of the specific state
>    primitive, and set the current key directly.
>
> In order to find out how to do that, put a breakpoint in debugger and walk
> up a couple of call stack frames, and/or walk into the value setters and
> model after how it is done there.
>
> Mind though, to restore the current key, if you happen to change it to
> another key.
>
> Doing this e.g. in initializeState() is time-insensitive, because this
> happens outside the ‘hot’ code paths.
>
>    1. If the number of elements to store is small, you can store it in
>    operator state and initialize your local structure in initializeState()
>    from it, you probably would want to keep the data in serialized form in
>    operator state, since you mentioned, serialization would be expensive.
>    2. There is another API (which I don’t remember the name of) that
>    allows you to store operator state as BLOB directly if that would be a
>    doable option for you.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Zakelly Lan <zakelly....@gmail.com>
> *Sent:* Wednesday, February 21, 2024 8:04 AM
> *To:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
> *Cc:* Flink User Group <user@flink.apache.org>
> *Subject:* Re: Preparing keyed state before snapshot
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Lorenzo,
>
>
>
> I think the most convenient way is to modify the code of the state
> backend, adding a k-v cache as you want.
>
>
>
> Otherwise IIUC, there's no public interface to get keyContext. But well,
> you may try something hacky. You may use the passed-in `Context` instance
> in processElement, and leverage java reflection to get
> the KeyedProcessOperator instance, where you can perform setCurrentKey().
>
>
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
> wrote:
>
> Thanks Zakelly,
>
>
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
>
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
>
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
>
> How can I do something similar without modifying the Flink runtime?
>
>
>
> Lorenzo
>
>
>
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote:
>
> Hi Lorenzo,
>
>
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
>
>
> Hope this helps.
>
>
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
> wrote:
>
> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
>
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am missing something
>
>
>
> Thanks
>
> Lorenzo
>
>
>
>
>
> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Good morning Lorenzo,
>
>
>
> You may want to implement
> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
> your KeyedProcessFunction.
>
> Btw. By the time initializeState(…) is called, the state backend is fully
> initialized and can be read and written to (which is not the case for when
> the open(…) function is called.
>
> In initializeState(…) you also get access to state of different operator
> key.
>
> SnapshotState(…) is called as part of the (each) checkpoint in order to
> store data.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
> *Sent:* Thursday, February 15, 2024 7:50 PM
> *To:* Flink User Group <user@flink.apache.org>
> *Subject:* Preparing keyed state before snapshot
>
>
>
> Hello everyone,
>
>
>
> I have a convoluted problem.
>
>
>
> I am implementing a KeyedProcessFunction that keeps some non-serializable
> "state" in memory, in a transient Map (key = stream key, value = the
> non-serializable "state").
>
>
>
> I can extract a serializable representation to put in Flink state, and I
> can load my in-memory "state" from the Flink state. But these operations
> are expensive.
>
>
>
> Initializing the in-memory "state" is relatively easy. I do it lazily, in
> processElement(), on the first record for the key.
>
>
>
> The problem is saving the in-memory "state" to Flink state.
>
> I need to do it only before the state snapshot. But KeyedProcessFunction
> has no entrypoint called before the state snapshot.
>
> I cannot use CheckpointedFunction.snapshotState(), because it does not
> work for keyed state.
>
>
>
> Any suggestions?
>
>
>
> Note that I cannot use operator state nor a broadcast state.
>
> Processing is keyed. Every processed record modifies the in-memory "state"
> of that key. If the job rescale, the state of the key must follow the
> partition.
>
>
>
>
>
> Regards
>
> Lorenzo
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to