Hi,

You cannot access the keyed state within #open(). It can only be
accessed under a keyed context ( a key is selected while processing an
element, e.g. #processElement).

Best,
Zakelly

On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski
<krzysiek.chmielew...@gmail.com> wrote:
>
> Hi,
> I'm having a problem with my toy flink job where I would like to access a 
> ValueState of a keyed stream. The Job setup can be found here [1], it is 
> fairly simple
>
> env
> .addSource(new CheckpointCountingSource(100, 60))
> .keyBy(value -> value)
> .process(new KeyCounter())
> .addSink(new ConsoleSink());
>
> As you can see I'm using a keyBay and KeyCounter is extending 
> KeyedProcessFunction.
> It seems that keyed state cannot be update from RichFunction::open() method. 
> Is that intended?
>
> When I ran this example I have an exception that says:
>
> Caused by: java.lang.NullPointerException: No key set. This method should not 
> be called outside of a keyed context.
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> at 
> org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
> at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
> at org.example.KeyCounter.open(KeyCounter.java:26)
>
>
> [1] 
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java

Reply via email to