Hi Kenneth, According to your suggestion, I modified my pipeline as follows:
p.apply(WithKeys.of(...).withKeyType(...)) > // (A) > .apply(Window.into(FixedWindows.of(...))) > // (B) > .apply(Combine.perKey(new MyCombinFn())) // (C) > .apply( > Window > .into(new GlobalWindows()) > // (E1) > .triggering( > Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) > ) > .accumulatingFiredPanes() > // (E3) > ) > .apply(GroupByKey.create()) > // (F) > .apply(ParDo.of(new MyDoFn())) > // (D) I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate over a list of output records from (C) sharing the same key. This way I can achieve the same effect without having a per-key state at (D). Do I understand your intention correctly? If not, please advise me with some hints on it. Thanks, Dongwon On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]> wrote: > Hi Dongwon, > > On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]> wrote: > >> Hi all, >> >> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >> pipeline looks like below: >> >>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>> .apply(Window.into(FixedWindows.of(...))) // (B) >> >> .apply(Combine.perKey(new MyCombinFn())) // (C) >> >> .apply(ParDo.of(new MyDoFn())) // (D) >> >> >> What I want to do is >> (1) to group data by key (A) and window (B), >> (2) to do some aggregation (C) >> (3) to perform the final computation on each group (D) >> >> I've noticed that a ValueState for a particular key is NULL whenever a >> new window for the key is arriving, which gives me a feeling that Beam >> seems to support only per-key+window state, not per-key state, after >> windowing. >> >> I usually work with Flink DataStream API and Flink supports both per-key >> state and per-key+window state [1]. >> >> Does Beam support per-key states, not per-key+window states, after >> windowing (D)? If I miss something, please correct me. >> > > You understand correctly - Beam does not include per-key state that > crosses window boundaries. If I understand your goal correctly, you can > achieve the same effect by copying the window metadata into the element and > then re-windowing into the global window before (D). > > Kenn > > >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >> >> Best, >> >> Dongwon >> >>
