Hi Reuven, You and Kenneth are right; I thought GlobalWindows in unbounded streams need triggers.
p.apply(WithKeys.of(...).withKeyType(...)) // (A) > .apply(Window.into(FixedWindows.of(...))) // (B) > .apply(Combine.perKey(new MyCombinFn())) // (C) > .apply(Window.into(new GlobalWindows())) // (E) > .apply(ParDo.of(new MyDoFn())) // (D) So just adding (E) blurs windows and makes the state defined in MyDoFn (D) a per-key state. Hope I understand you and Kenneth correctly this time. Best, Dongwon On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <[email protected]> wrote: > You could simply window into GlobalWindows and add a stateful DoFn > afterwards. No need for the triggering and GroupByKey. > > On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <[email protected]> wrote: > >> Hi Kenneth, >> >> According to your suggestion, I modified my pipeline as follows: >> >> p.apply(WithKeys.of(...).withKeyType(...)) >>> // (A) >>> .apply(Window.into(FixedWindows.of(...))) >>> // (B) >>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>> .apply( >>> Window >>> .into(new GlobalWindows()) >>> // (E1) >>> .triggering( >>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>> ) >>> .accumulatingFiredPanes() >>> // (E3) >>> ) >>> .apply(GroupByKey.create()) >>> // (F) >>> .apply(ParDo.of(new MyDoFn())) >>> // (D) >> >> >> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate >> over a list of output records from (C) sharing the same key. >> This way I can achieve the same effect without having a per-key state at >> (D). >> >> Do I understand your intention correctly? >> If not, please advise me with some hints on it. >> >> Thanks, >> >> Dongwon >> >> >> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]> wrote: >> >>> Hi Dongwon, >>> >>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]> >>> wrote: >>> >>>> Hi all, >>>> >>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>> pipeline looks like below: >>>> >>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>>> >>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>> >>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>> >>>> >>>> What I want to do is >>>> (1) to group data by key (A) and window (B), >>>> (2) to do some aggregation (C) >>>> (3) to perform the final computation on each group (D) >>>> >>>> I've noticed that a ValueState for a particular key is NULL whenever a >>>> new window for the key is arriving, which gives me a feeling that Beam >>>> seems to support only per-key+window state, not per-key state, after >>>> windowing. >>>> >>>> I usually work with Flink DataStream API and Flink supports both >>>> per-key state and per-key+window state [1]. >>>> >>>> Does Beam support per-key states, not per-key+window states, after >>>> windowing (D)? If I miss something, please correct me. >>>> >>> >>> You understand correctly - Beam does not include per-key state that >>> crosses window boundaries. If I understand your goal correctly, you can >>> achieve the same effect by copying the window metadata into the element and >>> then re-windowing into the global window before (D). >>> >>> Kenn >>> >>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>>
