Hi Krzysztof again,

Just for clarity … your sample code [1] tries to count the number of events per 
key.
Assuming this is your intention?

Anyway your previous implementation initialized the keyed state keyCounterState 
in the open function that is the right place to do this,
you just wouldn’t want to store values in the state from within the open() 
function.

InitializeState() and snapshotState() are mainly used to initialize operator 
state, not keyed state … refer to the relevant documentation.


Thias


From: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
Sent: Donnerstag, 7. September 2023 09:59
To: user <user@flink.apache.org>
Subject: using CheckpointedFunction on a keyed state

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,
I have a toy Flink job [1] where I have a KeyedProcessFunction implementation 
[2] that also implements the CheckpointedFunction. My stream definition has 
.keyBy(...) call as you can see in [1].

However when I'm trying to run this toy job I'm getting an exception from 
CheckpointedFunction::initializeState method that says:
"Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' 
operation."

I got an impression from the docs that  CheckpointedFunction can be used   on a 
keyed stream and CheckpointedFunction::initializeState is for initialziing the 
state object.
Are my assumptions wrong? Is  initializeState onlty to set an initial value of 
a state per key and state object must be initialized in open() method?

Thanks,
Krzysztof
[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java

[2] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to