Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This
method is not part of the API. I see it's set specifically for
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Lorenzo,
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
> Best,
> Zakelly
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
> wrote:
>
>> Hi Thias
>>
>> I considered CheckpointedFunction.
>> In snapshotState() I would have to update the state of each key,
>> extracting the in-memory "state" of each key and putting it in the state
>> with state.update(...) .
>> This must happen per key,
>> But snapshotState() has no visibility of the keys. And I have no way of
>> selectively accessing the state of a specific key to update it.
>> Unless I am missing something
>>
>> Thanks
>> Lorenzo
>>
>>
>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Good morning Lorenzo,
>>>
>>>
>>>
>>> You may want to implement
>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>> your KeyedProcessFunction.
>>>
>>> Btw. By the time initializeState(…) is called, the state backend is
>>> fully initialized and can be read and written to (which is not the case for
>>> when the open(…) function is called.
>>>
>>> In initializeState(…) you also get access to state of different operator
>>> key.
>>>
>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>> store data.
>>>
>>>
>>>
>>> Sincere greetings
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>> *To:* Flink User Group <user@flink.apache.org>
>>> *Subject:* Preparing keyed state before snapshot
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>>
>>>
>>> I have a convoluted problem.
>>>
>>>
>>>
>>> I am implementing a KeyedProcessFunction that keeps some
>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>> value = the non-serializable "state").
>>>
>>>
>>>
>>> I can extract a serializable representation to put in Flink state, and I
>>> can load my in-memory "state" from the Flink state. But these operations
>>> are expensive.
>>>
>>>
>>>
>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>> in processElement(), on the first record for the key.
>>>
>>>
>>>
>>> The problem is saving the in-memory "state" to Flink state.
>>>
>>> I need to do it only before the state snapshot. But KeyedProcessFunction
>>> has no entrypoint called before the state snapshot.
>>>
>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>> work for keyed state.
>>>
>>>
>>>
>>> Any suggestions?
>>>
>>>
>>>
>>> Note that I cannot use operator state nor a broadcast state.
>>>
>>> Processing is keyed. Every processed record modifies the in-memory
>>> "state" of that key. If the job rescale, the state of the key must follow
>>> the partition.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Lorenzo
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>

Reply via email to