Hi Reuven,

You and Kenneth are right; I thought GlobalWindows in unbounded streams
need triggers.

p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))      // (E)
>   .apply(ParDo.of(new MyDoFn()))                      // (D)


So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
a per-key state.
Hope I understand you and Kenneth correctly this time.

Best,

Dongwon

On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <[email protected]> wrote:

> You could simply window into GlobalWindows and add a stateful DoFn
> afterwards. No need for the triggering and GroupByKey.
>
> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <[email protected]> wrote:
>
>> Hi Kenneth,
>>
>> According to your suggestion, I modified my pipeline as follows:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))
>>>  // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>  // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>>   .apply(
>>>     Window
>>>       .into(new GlobalWindows())
>>>         // (E1)
>>>       .triggering(
>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>       )
>>>       .accumulatingFiredPanes()
>>>        // (E3)
>>>   )
>>>   .apply(GroupByKey.create())
>>>        // (F)
>>>   .apply(ParDo.of(new MyDoFn()))
>>>     // (D)
>>
>>
>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
>> over a list of output records from (C) sharing the same key.
>> This way I can achieve the same effect without having a per-key state at
>> (D).
>>
>> Do I understand your intention correctly?
>> If not, please advise me with some hints on it.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]> wrote:
>>
>>> Hi Dongwon,
>>>
>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>> pipeline looks like below:
>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>
>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>
>>>>
>>>> What I want to do is
>>>> (1) to group data by key (A) and window (B),
>>>> (2) to do some aggregation (C)
>>>> (3) to perform the final computation on each group (D)
>>>>
>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>> seems to support only per-key+window state, not per-key state, after
>>>> windowing.
>>>>
>>>> I usually work with Flink DataStream API and Flink supports both
>>>> per-key state and per-key+window state [1].
>>>>
>>>> Does Beam support per-key states, not per-key+window states, after
>>>> windowing (D)? If I miss something, please correct me.
>>>>
>>>
>>> You understand correctly - Beam does not include per-key state that
>>> crosses window boundaries. If I understand your goal correctly, you can
>>> achieve the same effect by copying the window metadata into the element and
>>> then re-windowing into the global window before (D).
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>>

Reply via email to