Hi Krzysztof,

You cannot access keyed state in open().
Keyed state has a value per key.
In theory you would have to initialize per possible key, which is quite 
impractical.
However you don’t need to initialize state, the initial state per key default 
to the default value of the type (null for objects).
Just drop the initializer [1]

Hope this helps

Thias


[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/033f74c427553fbfe0aaffe7d2af4382c09734ad/src/main/java/org/example/KeyCounter.java#L26




From: Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
Sent: Donnerstag, 7. September 2023 09:38
To: user <user@flink.apache.org>
Subject: updating keyed state in open method.

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,
I'm having a problem with my toy flink job where I would like to access a 
ValueState of a keyed stream. The Job setup can be found here [1], it is fairly 
simple
env
        .addSource(new CheckpointCountingSource(100, 60))
        .keyBy(value -> value)
        .process(new KeyCounter())
        .addSink(new ConsoleSink());

As you can see I'm using a keyBay and KeyCounter is extending 
KeyedProcessFunction.
It seems that keyed state cannot be update from RichFunction::open() method. Is 
that intended?

When I ran this example I have an exception that says:

Caused by: java.lang.NullPointerException: No key set. This method should not 
be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at 
org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
at org.example.KeyCounter.open(KeyCounter.java:26)


[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to