It is accurate to say that the "continuation trigger" is not documented in
the general programming guide. It shows up in the javadoc only, as far as I
can tell [1]. Technically, this is accurate. It is not part of the core of
Beam - each language SDK is required to explicitly specify a trigger for
every GroupByKey when they submit a pipeline to a runner. But, of course,
this is pretty much an implementation detail.

Kenn

[1]
https://www.google.com/search?q="continuation+trigger"+site%3Abeam.apache.org
<https://www.google.com/search?q=%22continuation+trigger%22+site%3Abeam.apache.org>

On Sun, Apr 28, 2019 at 7:08 PM Reza Rokni <[email protected]> wrote:

> +1 I recall a fun afternoon a few years ago figuring this out ...
>
> On Mon, 11 Mar 2019 at 18:36, Maximilian Michels <[email protected]> wrote:
>
>> Hi,
>>
>> I have seen several users including myself get confused by the "default"
>> triggering behavior. I think it would be worthwhile to update the docs.
>>
>> In fact, Window.into(windowFn) does not override the existing
>> windowing/triggering. It merges the previous input WindowStrategy with
>> the new one.
>>
>> So your w1trigger will still be set when you do not set w2trigger. The
>> default `AfterWatermark.pastEndOfWindow()` trigger will only be used
>> when windowing for the first time, or when you set it explicitly.
>>
>> Thanks,
>> Max
>>
>> On 06.03.19 00:28, Daniel Debrunner wrote:
>> > Thanks Kenn,.
>> >
>> > Is it fair to say that this continuation trigger functionality is not
>> > documented?
>> >
>> > In the Javadoc it has a similar sentence to the programming guide:
>> >
>> >> triggering(Trigger) allows specifying a trigger to control when (in
>> processing time) results for the given window can be produced. If
>> unspecified, the default behavior is to trigger first when the watermark
>> passes the end of the window, and then trigger again every time there is
>> late arriving data.
>> >
>> > Thanks,
>> > Dan.
>> >
>> > On Tue, Mar 5, 2019 at 1:46 PM Kenneth Knowles <[email protected]> wrote:
>> >>
>> >> The Window.into transform does not reset the trigger to the default.
>> So where you have w2trigger, if you leave it off, then the triggering is
>> left as the "continuation trigger" from w1trigger. Basically it tries to
>> let any output caused by w1trigger to flow all the way through the pipeline
>> without delay.
>> >>
>> >> Kenn
>> >>
>> >> On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <[email protected]>
>> wrote:
>> >>>
>> >>> I discover how to fix my issue but not sure I understand why it does.
>> >>>
>> >>> I created a complete sample here:
>> >>>
>> https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104
>> >>> Link points to the area of interest.
>> >>>
>> >>> With the second window I was originally not specifying a trigger so
>> >>> using the default trigger which lead to multiple triggers of the
>> >>> combine on the second window.
>> >>>
>> >>> However changing the trigger to be AfterWatermark.pastEndOfWindow()
>> >>> produced the output I expected, a single combine across all the
>> >>> elements in the window.
>> >>> The gist has comments showing the output and the two code variations.
>> >>>
>> >>> I don't understand why, since according to 8.1.1 [1] I thought
>> >>> AfterWatermark.pastEndOfWindow() was the default. Maybe its due to
>> >>> late data in some way but I'm not sure I understand how the data could
>> >>> be late in this case.
>> >>>
>> >>> This is with Beam 2.7 direct runner btw.
>> >>>
>> >>> Thanks again for your help,
>> >>> Dan.
>> >>> [1]
>> https://beam.apache.org/documentation/programming-guide/#event-time-triggers
>> >>>
>> >>> On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <[email protected]>
>> wrote:
>> >>>>
>> >>>> Thanks Robert, your description is what I'm expecting, I'm working on
>> >>>> a simple example to see if what I'm seeing is different and then
>> >>>> hopefully use that to clarify my misunderstanding.
>> >>>>
>> >>>> Thanks,
>> >>>> Dan.
>> >>>>
>> >>>> On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]>
>> wrote:
>> >>>>>
>> >>>>> Windows are assigned to elements via the Window.into transform. They
>> >>>>> influence grouping operations such as GroupByKey, Combine.perKey,
>> and
>> >>>>> Combine.globally. Looking at your example, you start with
>> >>>>>
>> >>>>>      PCollection<KV<A,B>>
>> >>>>>
>> >>>>> Presumably via a Read or a Create. These KVs are in a global window,
>> >>>>> so the elements are really triples (ignoring PaneInfo) of the form
>> >>>>>
>> >>>>>      (KV<A, B>, GlobalWindow, timestamp)
>> >>>>>
>> >>>>>  From what I gather, the next step you do is a
>> >>>>> Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
>> >>>>> whose elements are, implicitly
>> >>>>>
>> >>>>>      (KV<A, B>, IntervalWindow, timestamp)
>> >>>>>
>> >>>>> Now you apply a GroupByKey to get elements of the form
>> >>>>>
>> >>>>>      (KV<A, Iterable<B>>, IntervalWindow, timestamp)
>> >>>>>
>> >>>>> where there is one Iterable for each distinct key and window. You
>> >>>>> apply a ParDo to get PCollection<X> which is of the form
>> >>>>>
>> >>>>>      (X, IntervalWindow, timestamp)
>> >>>>>
>> >>>>> It looks like your next step is another
>> >>>>> Window.into(FixedWindows.of(...)), yielding
>> >>>>>
>> >>>>>      (X, IntervalWindow, timestamp)
>> >>>>>
>> >>>>> where the IntervalWindow here may be different if the parameters to
>> >>>>> FixedWindows were different (e.g. the first was by minute, the
>> second
>> >>>>> by hours). If it's the same, this is a no-op. Now you apply
>> >>>>> Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
>> >>>>> elements are of the form
>> >>>>>
>> >>>>>      (R, IntervalWindow, timestamp)
>> >>>>>
>> >>>>> where there is now one R per window (the elements in the same window
>> >>>>> being combined, the elements across windows not).
>> >>>>>
>> >>>>> FWIW, internally, Combine.globally is implemented as
>> PariWithNullKey +
>> >>>>> CombinePerKey + StripNullKey.
>> >>>>>
>> >>>>> Does this help?
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]>
>> wrote:
>> >>>>>>
>> >>>>>> Thanks for the reply.
>> >>>>>>
>> >>>>>> As for every element is always associated with a window, when a
>> >>>>>> element is produced due to a window trigger (e.g. the GroupByKey)
>> what
>> >>>>>> window is it associated with? The window it was produced from?
>> Maybe
>> >>>>>> the question is when is a window assigned to an element?
>> >>>>>>
>> >>>>>> I'll see if I can come up with an example,
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Dan.
>> >>>>>>
>> >>>>>> On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]>
>> wrote:
>> >>>>>>>
>> >>>>>>> Two pieces to this:
>> >>>>>>>
>> >>>>>>> 1. Every element in a PCollection is always associated with a
>> window, and GroupByKey (hence CombinePerKey) operates per-key-and-window
>> (w/ window merging).
>> >>>>>>> 2. If an element is not explicitly a KV, then there is no key
>> associated with it.
>> >>>>>>>
>> >>>>>>> I'm afraid I don't have any guesses at the problem based on what
>> you've shared. Can you say more?
>> >>>>>>>
>> >>>>>>> Kenn
>> >>>>>>>
>> >>>>>>> On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <
>> [email protected]> wrote:
>> >>>>>>>>
>> >>>>>>>> The windowing section of the Beam programming model guide shows a
>> >>>>>>>> window defined and used in the GropyByKey transform after a
>> ParDo.
>> >>>>>>>> (section 7.1.1).
>> >>>>>>>>
>> >>>>>>>> However I couldn't find any documentation on how long the window
>> >>>>>>>> remains in scope for subsequent transforms.
>> >>>>>>>>
>> >>>>>>>> I have an application with this pipeline:
>> >>>>>>>>
>> >>>>>>>> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
>> >>>>>>>> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
>> >>>>>>>> PCollection<R>
>> >>>>>>>>
>> >>>>>>>> The idea is that the first window is aggregating by key but in
>> the
>> >>>>>>>> second window I need to combine elements across all keys.
>> >>>>>>>>
>> >>>>>>>> With my initial app I was seeing some runtime errors in/after the
>> >>>>>>>> combine where a KV<null,R> was being seen, even though at that
>> point
>> >>>>>>>> there should be no key for the PCollection<R>.
>> >>>>>>>>
>> >>>>>>>> In a simpler test I can apply  FixedWindow<X> ->
>> Combine<X,R>.globally
>> >>>>>>>> -> PCollection<R> to a PCollection without an upstream window
>> and the
>> >>>>>>>> combine correctly happens once.
>> >>>>>>>> But then adding the keyed upstream window, the combine occurs
>> once per
>> >>>>>>>> key without any final combine across the keys.
>> >>>>>>>>
>> >>>>>>>> So it seems somehow the memory of the key exists even with the
>> new
>> >>>>>>>> window transform,
>> >>>>>>>>
>> >>>>>>>> I'm probably misunderstanding some detail of windowing, but I
>> couldn't
>> >>>>>>>> find any deeper documentation than the simple examples in the
>> >>>>>>>> programming model guide.
>> >>>>>>>>
>> >>>>>>>> Can anyone point me in the correct direction?
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Dan.
>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Reply via email to