Kenn - shouldn't the Reify happen before the rewindow? On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <[email protected]> wrote:
> > > On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <[email protected]> wrote: > >> Hi Reuven, >> >> You and Kenneth are right; I thought GlobalWindows in unbounded streams >> need triggers. >> >> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>> .apply(Window.into(FixedWindows.of(...))) // (B) >>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>> .apply(Window.into(new GlobalWindows())) // (E) >>> .apply(ParDo.of(new MyDoFn())) // (D) >> >> >> So just adding (E) blurs windows and makes the state defined in MyDoFn >> (D) a per-key state. >> Hope I understand you and Kenneth correctly this time. >> > > That is correct. However, I think you may want: > > p.apply(WithKeys.of(...).withKeyType(...)) // (A) >> .apply(Window.into(FixedWindows.of(...))) // (B) >> .apply(Combine.perKey(new MyCombinFn())) // (C) >> .apply(Window.into(new GlobalWindows())) // (E) > > > .apply(Reify.windowsInValue() > <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>) > // (G) > > >> .apply(ParDo.of(new MyDoFn())) // (D) > > > This will make the window information from (B) & (C) available to MyDoFn > in (D) > > Kenn > > >> >> Best, >> >> Dongwon >> >> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <[email protected]> wrote: >> >>> You could simply window into GlobalWindows and add a stateful DoFn >>> afterwards. No need for the triggering and GroupByKey. >>> >>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <[email protected]> >>> wrote: >>> >>>> Hi Kenneth, >>>> >>>> According to your suggestion, I modified my pipeline as follows: >>>> >>>> p.apply(WithKeys.of(...).withKeyType(...)) >>>>> // (A) >>>>> .apply(Window.into(FixedWindows.of(...))) >>>>> // (B) >>>>> .apply(Combine.perKey(new MyCombinFn())) // >>>>> (C) >>>>> .apply( >>>>> Window >>>>> .into(new GlobalWindows()) >>>>> // (E1) >>>>> .triggering( >>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>>>> ) >>>>> .accumulatingFiredPanes() >>>>> // (E3) >>>>> ) >>>>> .apply(GroupByKey.create()) >>>>> // (F) >>>>> .apply(ParDo.of(new MyDoFn())) >>>>> // (D) >>>> >>>> >>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can >>>> iterate over a list of output records from (C) sharing the same key. >>>> This way I can achieve the same effect without having a per-key state >>>> at (D). >>>> >>>> Do I understand your intention correctly? >>>> If not, please advise me with some hints on it. >>>> >>>> Thanks, >>>> >>>> Dongwon >>>> >>>> >>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>> Hi Dongwon, >>>>> >>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>>>> pipeline looks like below: >>>>>> >>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>>>>> >>>>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>>>> >>>>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>>>> >>>>>> >>>>>> What I want to do is >>>>>> (1) to group data by key (A) and window (B), >>>>>> (2) to do some aggregation (C) >>>>>> (3) to perform the final computation on each group (D) >>>>>> >>>>>> I've noticed that a ValueState for a particular key is NULL whenever >>>>>> a new window for the key is arriving, which gives me a feeling that Beam >>>>>> seems to support only per-key+window state, not per-key state, after >>>>>> windowing. >>>>>> >>>>>> I usually work with Flink DataStream API and Flink supports both >>>>>> per-key state and per-key+window state [1]. >>>>>> >>>>>> Does Beam support per-key states, not per-key+window states, after >>>>>> windowing (D)? If I miss something, please correct me. >>>>>> >>>>> >>>>> You understand correctly - Beam does not include per-key state that >>>>> crosses window boundaries. If I understand your goal correctly, you can >>>>> achieve the same effect by copying the window metadata into the element >>>>> and >>>>> then re-windowing into the global window before (D). >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>>>> >>>>>> Best, >>>>>> >>>>>> Dongwon >>>>>> >>>>>>
