Thanks Kenn,.

Is it fair to say that this continuation trigger functionality is not
documented?

In the Javadoc it has a similar sentence to the programming guide:

> triggering(Trigger) allows specifying a trigger to control when (in 
> processing time) results for the given window can be produced. If 
> unspecified, the default behavior is to trigger first when the watermark 
> passes the end of the window, and then trigger again every time there is late 
> arriving data.

Thanks,
Dan.

On Tue, Mar 5, 2019 at 1:46 PM Kenneth Knowles <[email protected]> wrote:
>
> The Window.into transform does not reset the trigger to the default. So where 
> you have w2trigger, if you leave it off, then the triggering is left as the 
> "continuation trigger" from w1trigger. Basically it tries to let any output 
> caused by w1trigger to flow all the way through the pipeline without delay.
>
> Kenn
>
> On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <[email protected]> wrote:
>>
>> I discover how to fix my issue but not sure I understand why it does.
>>
>> I created a complete sample here:
>> https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104
>> Link points to the area of interest.
>>
>> With the second window I was originally not specifying a trigger so
>> using the default trigger which lead to multiple triggers of the
>> combine on the second window.
>>
>> However changing the trigger to be AfterWatermark.pastEndOfWindow()
>> produced the output I expected, a single combine across all the
>> elements in the window.
>> The gist has comments showing the output and the two code variations.
>>
>> I don't understand why, since according to 8.1.1 [1] I thought
>> AfterWatermark.pastEndOfWindow() was the default. Maybe its due to
>> late data in some way but I'm not sure I understand how the data could
>> be late in this case.
>>
>> This is with Beam 2.7 direct runner btw.
>>
>> Thanks again for your help,
>> Dan.
>> [1] 
>> https://beam.apache.org/documentation/programming-guide/#event-time-triggers
>>
>> On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <[email protected]> wrote:
>> >
>> > Thanks Robert, your description is what I'm expecting, I'm working on
>> > a simple example to see if what I'm seeing is different and then
>> > hopefully use that to clarify my misunderstanding.
>> >
>> > Thanks,
>> > Dan.
>> >
>> > On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]> 
>> > wrote:
>> > >
>> > > Windows are assigned to elements via the Window.into transform. They
>> > > influence grouping operations such as GroupByKey, Combine.perKey, and
>> > > Combine.globally. Looking at your example, you start with
>> > >
>> > >     PCollection<KV<A,B>>
>> > >
>> > > Presumably via a Read or a Create. These KVs are in a global window,
>> > > so the elements are really triples (ignoring PaneInfo) of the form
>> > >
>> > >     (KV<A, B>, GlobalWindow, timestamp)
>> > >
>> > > From what I gather, the next step you do is a
>> > > Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
>> > > whose elements are, implicitly
>> > >
>> > >     (KV<A, B>, IntervalWindow, timestamp)
>> > >
>> > > Now you apply a GroupByKey to get elements of the form
>> > >
>> > >     (KV<A, Iterable<B>>, IntervalWindow, timestamp)
>> > >
>> > > where there is one Iterable for each distinct key and window. You
>> > > apply a ParDo to get PCollection<X> which is of the form
>> > >
>> > >     (X, IntervalWindow, timestamp)
>> > >
>> > > It looks like your next step is another
>> > > Window.into(FixedWindows.of(...)), yielding
>> > >
>> > >     (X, IntervalWindow, timestamp)
>> > >
>> > > where the IntervalWindow here may be different if the parameters to
>> > > FixedWindows were different (e.g. the first was by minute, the second
>> > > by hours). If it's the same, this is a no-op. Now you apply
>> > > Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
>> > > elements are of the form
>> > >
>> > >     (R, IntervalWindow, timestamp)
>> > >
>> > > where there is now one R per window (the elements in the same window
>> > > being combined, the elements across windows not).
>> > >
>> > > FWIW, internally, Combine.globally is implemented as PariWithNullKey +
>> > > CombinePerKey + StripNullKey.
>> > >
>> > > Does this help?
>> > >
>> > >
>> > > On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> 
>> > > wrote:
>> > > >
>> > > > Thanks for the reply.
>> > > >
>> > > > As for every element is always associated with a window, when a
>> > > > element is produced due to a window trigger (e.g. the GroupByKey) what
>> > > > window is it associated with? The window it was produced from? Maybe
>> > > > the question is when is a window assigned to an element?
>> > > >
>> > > > I'll see if I can come up with an example,
>> > > >
>> > > > Thanks,
>> > > > Dan.
>> > > >
>> > > > On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> 
>> > > > wrote:
>> > > > >
>> > > > > Two pieces to this:
>> > > > >
>> > > > > 1. Every element in a PCollection is always associated with a 
>> > > > > window, and GroupByKey (hence CombinePerKey) operates 
>> > > > > per-key-and-window (w/ window merging).
>> > > > > 2. If an element is not explicitly a KV, then there is no key 
>> > > > > associated with it.
>> > > > >
>> > > > > I'm afraid I don't have any guesses at the problem based on what 
>> > > > > you've shared. Can you say more?
>> > > > >
>> > > > > Kenn
>> > > > >
>> > > > > On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner 
>> > > > > <[email protected]> wrote:
>> > > > >>
>> > > > >> The windowing section of the Beam programming model guide shows a
>> > > > >> window defined and used in the GropyByKey transform after a ParDo.
>> > > > >> (section 7.1.1).
>> > > > >>
>> > > > >> However I couldn't find any documentation on how long the window
>> > > > >> remains in scope for subsequent transforms.
>> > > > >>
>> > > > >> I have an application with this pipeline:
>> > > > >>
>> > > > >> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
>> > > > >> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
>> > > > >> PCollection<R>
>> > > > >>
>> > > > >> The idea is that the first window is aggregating by key but in the
>> > > > >> second window I need to combine elements across all keys.
>> > > > >>
>> > > > >> With my initial app I was seeing some runtime errors in/after the
>> > > > >> combine where a KV<null,R> was being seen, even though at that point
>> > > > >> there should be no key for the PCollection<R>.
>> > > > >>
>> > > > >> In a simpler test I can apply  FixedWindow<X> -> 
>> > > > >> Combine<X,R>.globally
>> > > > >> -> PCollection<R> to a PCollection without an upstream window and 
>> > > > >> the
>> > > > >> combine correctly happens once.
>> > > > >> But then adding the keyed upstream window, the combine occurs once 
>> > > > >> per
>> > > > >> key without any final combine across the keys.
>> > > > >>
>> > > > >> So it seems somehow the memory of the key exists even with the new
>> > > > >> window transform,
>> > > > >>
>> > > > >> I'm probably misunderstanding some detail of windowing, but I 
>> > > > >> couldn't
>> > > > >> find any deeper documentation than the simple examples in the
>> > > > >> programming model guide.
>> > > > >>
>> > > > >> Can anyone point me in the correct direction?
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Dan.

Reply via email to