The Window.into transform does not reset the trigger to the default. So
where you have w2trigger, if you leave it off, then the triggering is left
as the "continuation trigger" from w1trigger. Basically it tries to let any
output caused by w1trigger to flow all the way through the pipeline without
delay.

Kenn

On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <[email protected]> wrote:

> I discover how to fix my issue but not sure I understand why it does.
>
> I created a complete sample here:
>
> https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104
> Link points to the area of interest.
>
> With the second window I was originally not specifying a trigger so
> using the default trigger which lead to multiple triggers of the
> combine on the second window.
>
> However changing the trigger to be AfterWatermark.pastEndOfWindow()
> produced the output I expected, a single combine across all the
> elements in the window.
> The gist has comments showing the output and the two code variations.
>
> I don't understand why, since according to 8.1.1 [1] I thought
> AfterWatermark.pastEndOfWindow() was the default. Maybe its due to
> late data in some way but I'm not sure I understand how the data could
> be late in this case.
>
> This is with Beam 2.7 direct runner btw.
>
> Thanks again for your help,
> Dan.
> [1]
> https://beam.apache.org/documentation/programming-guide/#event-time-triggers
>
> On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <[email protected]>
> wrote:
> >
> > Thanks Robert, your description is what I'm expecting, I'm working on
> > a simple example to see if what I'm seeing is different and then
> > hopefully use that to clarify my misunderstanding.
> >
> > Thanks,
> > Dan.
> >
> > On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]>
> wrote:
> > >
> > > Windows are assigned to elements via the Window.into transform. They
> > > influence grouping operations such as GroupByKey, Combine.perKey, and
> > > Combine.globally. Looking at your example, you start with
> > >
> > >     PCollection<KV<A,B>>
> > >
> > > Presumably via a Read or a Create. These KVs are in a global window,
> > > so the elements are really triples (ignoring PaneInfo) of the form
> > >
> > >     (KV<A, B>, GlobalWindow, timestamp)
> > >
> > > From what I gather, the next step you do is a
> > > Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
> > > whose elements are, implicitly
> > >
> > >     (KV<A, B>, IntervalWindow, timestamp)
> > >
> > > Now you apply a GroupByKey to get elements of the form
> > >
> > >     (KV<A, Iterable<B>>, IntervalWindow, timestamp)
> > >
> > > where there is one Iterable for each distinct key and window. You
> > > apply a ParDo to get PCollection<X> which is of the form
> > >
> > >     (X, IntervalWindow, timestamp)
> > >
> > > It looks like your next step is another
> > > Window.into(FixedWindows.of(...)), yielding
> > >
> > >     (X, IntervalWindow, timestamp)
> > >
> > > where the IntervalWindow here may be different if the parameters to
> > > FixedWindows were different (e.g. the first was by minute, the second
> > > by hours). If it's the same, this is a no-op. Now you apply
> > > Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
> > > elements are of the form
> > >
> > >     (R, IntervalWindow, timestamp)
> > >
> > > where there is now one R per window (the elements in the same window
> > > being combined, the elements across windows not).
> > >
> > > FWIW, internally, Combine.globally is implemented as PariWithNullKey +
> > > CombinePerKey + StripNullKey.
> > >
> > > Does this help?
> > >
> > >
> > > On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]>
> wrote:
> > > >
> > > > Thanks for the reply.
> > > >
> > > > As for every element is always associated with a window, when a
> > > > element is produced due to a window trigger (e.g. the GroupByKey)
> what
> > > > window is it associated with? The window it was produced from? Maybe
> > > > the question is when is a window assigned to an element?
> > > >
> > > > I'll see if I can come up with an example,
> > > >
> > > > Thanks,
> > > > Dan.
> > > >
> > > > On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]>
> wrote:
> > > > >
> > > > > Two pieces to this:
> > > > >
> > > > > 1. Every element in a PCollection is always associated with a
> window, and GroupByKey (hence CombinePerKey) operates per-key-and-window
> (w/ window merging).
> > > > > 2. If an element is not explicitly a KV, then there is no key
> associated with it.
> > > > >
> > > > > I'm afraid I don't have any guesses at the problem based on what
> you've shared. Can you say more?
> > > > >
> > > > > Kenn
> > > > >
> > > > > On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <
> [email protected]> wrote:
> > > > >>
> > > > >> The windowing section of the Beam programming model guide shows a
> > > > >> window defined and used in the GropyByKey transform after a ParDo.
> > > > >> (section 7.1.1).
> > > > >>
> > > > >> However I couldn't find any documentation on how long the window
> > > > >> remains in scope for subsequent transforms.
> > > > >>
> > > > >> I have an application with this pipeline:
> > > > >>
> > > > >> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
> > > > >> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
> > > > >> PCollection<R>
> > > > >>
> > > > >> The idea is that the first window is aggregating by key but in the
> > > > >> second window I need to combine elements across all keys.
> > > > >>
> > > > >> With my initial app I was seeing some runtime errors in/after the
> > > > >> combine where a KV<null,R> was being seen, even though at that
> point
> > > > >> there should be no key for the PCollection<R>.
> > > > >>
> > > > >> In a simpler test I can apply  FixedWindow<X> ->
> Combine<X,R>.globally
> > > > >> -> PCollection<R> to a PCollection without an upstream window and
> the
> > > > >> combine correctly happens once.
> > > > >> But then adding the keyed upstream window, the combine occurs
> once per
> > > > >> key without any final combine across the keys.
> > > > >>
> > > > >> So it seems somehow the memory of the key exists even with the new
> > > > >> window transform,
> > > > >>
> > > > >> I'm probably misunderstanding some detail of windowing, but I
> couldn't
> > > > >> find any deeper documentation than the simple examples in the
> > > > >> programming model guide.
> > > > >>
> > > > >> Can anyone point me in the correct direction?
> > > > >>
> > > > >> Thanks,
> > > > >> Dan.
>

Reply via email to