Hi Kenneth,

According to your suggestion, I modified my pipeline as follows:

p.apply(WithKeys.of(...).withKeyType(...))
>  // (A)
>   .apply(Window.into(FixedWindows.of(...)))
>  // (B)
>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>   .apply(
>     Window
>       .into(new GlobalWindows())
>       // (E1)
>       .triggering(
>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>       )
>       .accumulatingFiredPanes()
>      // (E3)
>   )
>   .apply(GroupByKey.create())
>      // (F)
>   .apply(ParDo.of(new MyDoFn()))
>   // (D)


I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
over a list of output records from (C) sharing the same key.
This way I can achieve the same effect without having a per-key state at
(D).

Do I understand your intention correctly?
If not, please advise me with some hints on it.

Thanks,

Dongwon


On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]> wrote:

> Hi Dongwon,
>
> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]> wrote:
>
>> Hi all,
>>
>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>> pipeline looks like below:
>>
>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> What I want to do is
>> (1) to group data by key (A) and window (B),
>> (2) to do some aggregation (C)
>> (3) to perform the final computation on each group (D)
>>
>> I've noticed that a ValueState for a particular key is NULL whenever a
>> new window for the key is arriving, which gives me a feeling that Beam
>> seems to support only per-key+window state, not per-key state, after
>> windowing.
>>
>> I usually work with Flink DataStream API and Flink supports both per-key
>> state and per-key+window state [1].
>>
>> Does Beam support per-key states, not per-key+window states, after
>> windowing (D)? If I miss something, please correct me.
>>
>
> You understand correctly - Beam does not include per-key state that
> crosses window boundaries. If I understand your goal correctly, you can
> achieve the same effect by copying the window metadata into the element and
> then re-windowing into the global window before (D).
>
> Kenn
>
>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> Best,
>>
>> Dongwon
>>
>>

Reply via email to