It is accurate to say that the "continuation trigger" is not documented in the general programming guide. It shows up in the javadoc only, as far as I can tell [1]. Technically, this is accurate. It is not part of the core of Beam - each language SDK is required to explicitly specify a trigger for every GroupByKey when they submit a pipeline to a runner. But, of course, this is pretty much an implementation detail.
Kenn [1] https://www.google.com/search?q="continuation+trigger"+site%3Abeam.apache.org <https://www.google.com/search?q=%22continuation+trigger%22+site%3Abeam.apache.org> On Sun, Apr 28, 2019 at 7:08 PM Reza Rokni <[email protected]> wrote: > +1 I recall a fun afternoon a few years ago figuring this out ... > > On Mon, 11 Mar 2019 at 18:36, Maximilian Michels <[email protected]> wrote: > >> Hi, >> >> I have seen several users including myself get confused by the "default" >> triggering behavior. I think it would be worthwhile to update the docs. >> >> In fact, Window.into(windowFn) does not override the existing >> windowing/triggering. It merges the previous input WindowStrategy with >> the new one. >> >> So your w1trigger will still be set when you do not set w2trigger. The >> default `AfterWatermark.pastEndOfWindow()` trigger will only be used >> when windowing for the first time, or when you set it explicitly. >> >> Thanks, >> Max >> >> On 06.03.19 00:28, Daniel Debrunner wrote: >> > Thanks Kenn,. >> > >> > Is it fair to say that this continuation trigger functionality is not >> > documented? >> > >> > In the Javadoc it has a similar sentence to the programming guide: >> > >> >> triggering(Trigger) allows specifying a trigger to control when (in >> processing time) results for the given window can be produced. If >> unspecified, the default behavior is to trigger first when the watermark >> passes the end of the window, and then trigger again every time there is >> late arriving data. >> > >> > Thanks, >> > Dan. >> > >> > On Tue, Mar 5, 2019 at 1:46 PM Kenneth Knowles <[email protected]> wrote: >> >> >> >> The Window.into transform does not reset the trigger to the default. >> So where you have w2trigger, if you leave it off, then the triggering is >> left as the "continuation trigger" from w1trigger. Basically it tries to >> let any output caused by w1trigger to flow all the way through the pipeline >> without delay. >> >> >> >> Kenn >> >> >> >> On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <[email protected]> >> wrote: >> >>> >> >>> I discover how to fix my issue but not sure I understand why it does. >> >>> >> >>> I created a complete sample here: >> >>> >> https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104 >> >>> Link points to the area of interest. >> >>> >> >>> With the second window I was originally not specifying a trigger so >> >>> using the default trigger which lead to multiple triggers of the >> >>> combine on the second window. >> >>> >> >>> However changing the trigger to be AfterWatermark.pastEndOfWindow() >> >>> produced the output I expected, a single combine across all the >> >>> elements in the window. >> >>> The gist has comments showing the output and the two code variations. >> >>> >> >>> I don't understand why, since according to 8.1.1 [1] I thought >> >>> AfterWatermark.pastEndOfWindow() was the default. Maybe its due to >> >>> late data in some way but I'm not sure I understand how the data could >> >>> be late in this case. >> >>> >> >>> This is with Beam 2.7 direct runner btw. >> >>> >> >>> Thanks again for your help, >> >>> Dan. >> >>> [1] >> https://beam.apache.org/documentation/programming-guide/#event-time-triggers >> >>> >> >>> On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <[email protected]> >> wrote: >> >>>> >> >>>> Thanks Robert, your description is what I'm expecting, I'm working on >> >>>> a simple example to see if what I'm seeing is different and then >> >>>> hopefully use that to clarify my misunderstanding. >> >>>> >> >>>> Thanks, >> >>>> Dan. >> >>>> >> >>>> On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]> >> wrote: >> >>>>> >> >>>>> Windows are assigned to elements via the Window.into transform. They >> >>>>> influence grouping operations such as GroupByKey, Combine.perKey, >> and >> >>>>> Combine.globally. Looking at your example, you start with >> >>>>> >> >>>>> PCollection<KV<A,B>> >> >>>>> >> >>>>> Presumably via a Read or a Create. These KVs are in a global window, >> >>>>> so the elements are really triples (ignoring PaneInfo) of the form >> >>>>> >> >>>>> (KV<A, B>, GlobalWindow, timestamp) >> >>>>> >> >>>>> From what I gather, the next step you do is a >> >>>>> Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>> >> >>>>> whose elements are, implicitly >> >>>>> >> >>>>> (KV<A, B>, IntervalWindow, timestamp) >> >>>>> >> >>>>> Now you apply a GroupByKey to get elements of the form >> >>>>> >> >>>>> (KV<A, Iterable<B>>, IntervalWindow, timestamp) >> >>>>> >> >>>>> where there is one Iterable for each distinct key and window. You >> >>>>> apply a ParDo to get PCollection<X> which is of the form >> >>>>> >> >>>>> (X, IntervalWindow, timestamp) >> >>>>> >> >>>>> It looks like your next step is another >> >>>>> Window.into(FixedWindows.of(...)), yielding >> >>>>> >> >>>>> (X, IntervalWindow, timestamp) >> >>>>> >> >>>>> where the IntervalWindow here may be different if the parameters to >> >>>>> FixedWindows were different (e.g. the first was by minute, the >> second >> >>>>> by hours). If it's the same, this is a no-op. Now you apply >> >>>>> Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose >> >>>>> elements are of the form >> >>>>> >> >>>>> (R, IntervalWindow, timestamp) >> >>>>> >> >>>>> where there is now one R per window (the elements in the same window >> >>>>> being combined, the elements across windows not). >> >>>>> >> >>>>> FWIW, internally, Combine.globally is implemented as >> PariWithNullKey + >> >>>>> CombinePerKey + StripNullKey. >> >>>>> >> >>>>> Does this help? >> >>>>> >> >>>>> >> >>>>> On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> >> wrote: >> >>>>>> >> >>>>>> Thanks for the reply. >> >>>>>> >> >>>>>> As for every element is always associated with a window, when a >> >>>>>> element is produced due to a window trigger (e.g. the GroupByKey) >> what >> >>>>>> window is it associated with? The window it was produced from? >> Maybe >> >>>>>> the question is when is a window assigned to an element? >> >>>>>> >> >>>>>> I'll see if I can come up with an example, >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Dan. >> >>>>>> >> >>>>>> On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> >> wrote: >> >>>>>>> >> >>>>>>> Two pieces to this: >> >>>>>>> >> >>>>>>> 1. Every element in a PCollection is always associated with a >> window, and GroupByKey (hence CombinePerKey) operates per-key-and-window >> (w/ window merging). >> >>>>>>> 2. If an element is not explicitly a KV, then there is no key >> associated with it. >> >>>>>>> >> >>>>>>> I'm afraid I don't have any guesses at the problem based on what >> you've shared. Can you say more? >> >>>>>>> >> >>>>>>> Kenn >> >>>>>>> >> >>>>>>> On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner < >> [email protected]> wrote: >> >>>>>>>> >> >>>>>>>> The windowing section of the Beam programming model guide shows a >> >>>>>>>> window defined and used in the GropyByKey transform after a >> ParDo. >> >>>>>>>> (section 7.1.1). >> >>>>>>>> >> >>>>>>>> However I couldn't find any documentation on how long the window >> >>>>>>>> remains in scope for subsequent transforms. >> >>>>>>>> >> >>>>>>>> I have an application with this pipeline: >> >>>>>>>> >> >>>>>>>> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey -> >> >>>>>>>> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally -> >> >>>>>>>> PCollection<R> >> >>>>>>>> >> >>>>>>>> The idea is that the first window is aggregating by key but in >> the >> >>>>>>>> second window I need to combine elements across all keys. >> >>>>>>>> >> >>>>>>>> With my initial app I was seeing some runtime errors in/after the >> >>>>>>>> combine where a KV<null,R> was being seen, even though at that >> point >> >>>>>>>> there should be no key for the PCollection<R>. >> >>>>>>>> >> >>>>>>>> In a simpler test I can apply FixedWindow<X> -> >> Combine<X,R>.globally >> >>>>>>>> -> PCollection<R> to a PCollection without an upstream window >> and the >> >>>>>>>> combine correctly happens once. >> >>>>>>>> But then adding the keyed upstream window, the combine occurs >> once per >> >>>>>>>> key without any final combine across the keys. >> >>>>>>>> >> >>>>>>>> So it seems somehow the memory of the key exists even with the >> new >> >>>>>>>> window transform, >> >>>>>>>> >> >>>>>>>> I'm probably misunderstanding some detail of windowing, but I >> couldn't >> >>>>>>>> find any deeper documentation than the simple examples in the >> >>>>>>>> programming model guide. >> >>>>>>>> >> >>>>>>>> Can anyone point me in the correct direction? >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> Dan. >> > > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. >
