Re: Support of per-key state after windowing

2020-08-22 Thread Kenneth Knowles
Hi Dongwon,

On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim  wrote:

> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
> looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>
>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a new
> window for the key is arriving, which gives me a feeling that Beam seems to
> support only per-key+window state, not per-key state, after windowing.
>
> I usually work with Flink DataStream API and Flink supports both per-key
> state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>

You understand correctly - Beam does not include per-key state that crosses
window boundaries. If I understand your goal correctly, you can achieve the
same effect by copying the window metadata into the element and then
re-windowing into the global window before (D).

Kenn


>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>


Support of per-key state after windowing

2020-08-22 Thread Dongwon Kim
Hi all,

I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
looks like below:

> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)

  .apply(Combine.perKey(new MyCombinFn()))  // (C)

  .apply(ParDo.of(new MyDoFn()))  // (D)


What I want to do is
(1) to group data by key (A) and window (B),
(2) to do some aggregation (C)
(3) to perform the final computation on each group (D)

I've noticed that a ValueState for a particular key is NULL whenever a new
window for the key is arriving, which gives me a feeling that Beam seems to
support only per-key+window state, not per-key state, after windowing.

I usually work with Flink DataStream API and Flink supports both per-key
state and per-key+window state [1].

Does Beam support per-key states, not per-key+window states, after
windowing (D)? If I miss something, please correct me.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

Best,

Dongwon