Hi Dongwon,
On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim wrote:
> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
> looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>> .apply(Window.into(FixedWindows.of(...)))// (B)
>
> .apply(Combine.perKey(new MyCombinFn())) // (C)
>
> .apply(ParDo.of(new MyDoFn())) // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a new
> window for the key is arriving, which gives me a feeling that Beam seems to
> support only per-key+window state, not per-key state, after windowing.
>
> I usually work with Flink DataStream API and Flink supports both per-key
> state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>
You understand correctly - Beam does not include per-key state that crosses
window boundaries. If I understand your goal correctly, you can achieve the
same effect by copying the window metadata into the element and then
re-windowing into the global window before (D).
Kenn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>