Thanks,
that helped.

Regards,
Krzysztof Chmielewski

czw., 7 wrz 2023 o 09:52 Schwalbe Matthias <matthias.schwa...@viseca.ch>
napisał(a):

> Hi Krzysztof,
>
>
>
> You cannot access keyed state in open().
>
> Keyed state has a value per key.
>
> In theory you would have to initialize per possible key, which is quite
> impractical.
>
> However you don’t need to initialize state, the initial state per key
> default to the default value of the type (null for objects).
>
> Just drop the initializer [1]
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/033f74c427553fbfe0aaffe7d2af4382c09734ad/src/main/java/org/example/KeyCounter.java#L26
>
>
>
>
>
>
>
>
>
> *From:* Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
> *Sent:* Donnerstag, 7. September 2023 09:38
> *To:* user <user@flink.apache.org>
> *Subject:* updating keyed state in open method.
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
> I'm having a problem with my toy flink job where I would like to access a
> ValueState of a keyed stream. The Job setup can be found here [1], it is
> fairly simple
>
> env
>         .addSource(new CheckpointCountingSource(100, 60))
>         .keyBy(value -> value)
>         .process(new KeyCounter())
>         .addSink(new ConsoleSink());
>
>
> As you can see I'm using a keyBay and KeyCounter is
> extending KeyedProcessFunction.
> It seems that keyed state cannot be update from RichFunction::open()
> method. Is that intended?
>
> When I ran this example I have an exception that says:
>
>
> Caused by: java.lang.NullPointerException: No key set. This method should
> not be called outside of a keyed context.
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> at
> org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
> at
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
> at org.example.KeyCounter.open(KeyCounter.java:26)
>
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to