Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting
the in-memory "state" of each key and putting it in the state with
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Good morning Lorenzo,
>
>
>
> You may want to implement
> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
> your KeyedProcessFunction.
>
> Btw. By the time initializeState(…) is called, the state backend is fully
> initialized and can be read and written to (which is not the case for when
> the open(…) function is called.
>
> In initializeState(…) you also get access to state of different operator
> key.
>
> SnapshotState(…) is called as part of the (each) checkpoint in order to
> store data.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
> *Sent:* Thursday, February 15, 2024 7:50 PM
> *To:* Flink User Group <user@flink.apache.org>
> *Subject:* Preparing keyed state before snapshot
>
>
>
> Hello everyone,
>
>
>
> I have a convoluted problem.
>
>
>
> I am implementing a KeyedProcessFunction that keeps some non-serializable
> "state" in memory, in a transient Map (key = stream key, value = the
> non-serializable "state").
>
>
>
> I can extract a serializable representation to put in Flink state, and I
> can load my in-memory "state" from the Flink state. But these operations
> are expensive.
>
>
>
> Initializing the in-memory "state" is relatively easy. I do it lazily, in
> processElement(), on the first record for the key.
>
>
>
> The problem is saving the in-memory "state" to Flink state.
>
> I need to do it only before the state snapshot. But KeyedProcessFunction
> has no entrypoint called before the state snapshot.
>
> I cannot use CheckpointedFunction.snapshotState(), because it does not
> work for keyed state.
>
>
>
> Any suggestions?
>
>
>
> Note that I cannot use operator state nor a broadcast state.
>
> Processing is keyed. Every processed record modifies the in-memory "state"
> of that key. If the job rescale, the state of the key must follow the
> partition.
>
>
>
>
>
> Regards
>
> Lorenzo
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to