While it might be debatable whether "continuation triggers" are part of the model, the goal should be to provide a consistent experience across SDKs. I don't see a reason why the Java SDK would use continuation triggers while the Python SDK doesn't.

This makes me think that trigger behavior across transforms should actually be part of the model. Or at least be standardized for SDK authors. This would also imply that it is documented for end users.

In the end, users do not care about whether it's part of the model or not, but they like having no surprises :)

On 29.04.19 09:20, Robert Bradshaw wrote:
I would say that the triggering done in stacked GBKs, with windowings
in between, is part of the model (at least in the sense that it's not
something that we'd want different SDKs to do separately.)

OTOH, I'm not sure the continuation trigger should be part of the
model. Much easier to either let WindowInto with no trigger specified
either keep the existing one or reset it to the default. A runner can
mutate this to a continuation trigger under the hood, which should be
strictly looser (triggers are a promise about the earliest possible
firing, they don't force firings to happen).

On Mon, Apr 29, 2019 at 4:34 AM Kenneth Knowles <k...@apache.org> wrote:

It is accurate to say that the "continuation trigger" is not documented in the 
general programming guide. It shows up in the javadoc only, as far as I can tell [1]. 
Technically, this is accurate. It is not part of the core of Beam - each language SDK is 
required to explicitly specify a trigger for every GroupByKey when they submit a pipeline 
to a runner. But, of course, this is pretty much an implementation detail.

Kenn

[1] 
https://www.google.com/search?q="continuation+trigger"+site%3Abeam.apache.org

On Sun, Apr 28, 2019 at 7:08 PM Reza Rokni <r...@google.com> wrote:

+1 I recall a fun afternoon a few years ago figuring this out ...

On Mon, 11 Mar 2019 at 18:36, Maximilian Michels <m...@apache.org> wrote:

Hi,

I have seen several users including myself get confused by the "default"
triggering behavior. I think it would be worthwhile to update the docs.

In fact, Window.into(windowFn) does not override the existing
windowing/triggering. It merges the previous input WindowStrategy with
the new one.

So your w1trigger will still be set when you do not set w2trigger. The
default `AfterWatermark.pastEndOfWindow()` trigger will only be used
when windowing for the first time, or when you set it explicitly.

Thanks,
Max

On 06.03.19 00:28, Daniel Debrunner wrote:
Thanks Kenn,.

Is it fair to say that this continuation trigger functionality is not
documented?

In the Javadoc it has a similar sentence to the programming guide:

triggering(Trigger) allows specifying a trigger to control when (in processing 
time) results for the given window can be produced. If unspecified, the default 
behavior is to trigger first when the watermark passes the end of the window, 
and then trigger again every time there is late arriving data.

Thanks,
Dan.

On Tue, Mar 5, 2019 at 1:46 PM Kenneth Knowles <k...@apache.org> wrote:

The Window.into transform does not reset the trigger to the default. So where you have 
w2trigger, if you leave it off, then the triggering is left as the "continuation 
trigger" from w1trigger. Basically it tries to let any output caused by w1trigger to 
flow all the way through the pipeline without delay.

Kenn

On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <d...@debrunners.com> wrote:

I discover how to fix my issue but not sure I understand why it does.

I created a complete sample here:
https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104
Link points to the area of interest.

With the second window I was originally not specifying a trigger so
using the default trigger which lead to multiple triggers of the
combine on the second window.

However changing the trigger to be AfterWatermark.pastEndOfWindow()
produced the output I expected, a single combine across all the
elements in the window.
The gist has comments showing the output and the two code variations.

I don't understand why, since according to 8.1.1 [1] I thought
AfterWatermark.pastEndOfWindow() was the default. Maybe its due to
late data in some way but I'm not sure I understand how the data could
be late in this case.

This is with Beam 2.7 direct runner btw.

Thanks again for your help,
Dan.
[1] https://beam.apache.org/documentation/programming-guide/#event-time-triggers

On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <d...@debrunners.com> wrote:

Thanks Robert, your description is what I'm expecting, I'm working on
a simple example to see if what I'm seeing is different and then
hopefully use that to clarify my misunderstanding.

Thanks,
Dan.

On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <rober...@google.com> wrote:

Windows are assigned to elements via the Window.into transform. They
influence grouping operations such as GroupByKey, Combine.perKey, and
Combine.globally. Looking at your example, you start with

      PCollection<KV<A,B>>

Presumably via a Read or a Create. These KVs are in a global window,
so the elements are really triples (ignoring PaneInfo) of the form

      (KV<A, B>, GlobalWindow, timestamp)

  From what I gather, the next step you do is a
Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
whose elements are, implicitly

      (KV<A, B>, IntervalWindow, timestamp)

Now you apply a GroupByKey to get elements of the form

      (KV<A, Iterable<B>>, IntervalWindow, timestamp)

where there is one Iterable for each distinct key and window. You
apply a ParDo to get PCollection<X> which is of the form

      (X, IntervalWindow, timestamp)

It looks like your next step is another
Window.into(FixedWindows.of(...)), yielding

      (X, IntervalWindow, timestamp)

where the IntervalWindow here may be different if the parameters to
FixedWindows were different (e.g. the first was by minute, the second
by hours). If it's the same, this is a no-op. Now you apply
Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
elements are of the form

      (R, IntervalWindow, timestamp)

where there is now one R per window (the elements in the same window
being combined, the elements across windows not).

FWIW, internally, Combine.globally is implemented as PariWithNullKey +
CombinePerKey + StripNullKey.

Does this help?


On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <d...@debrunners.com> wrote:

Thanks for the reply.

As for every element is always associated with a window, when a
element is produced due to a window trigger (e.g. the GroupByKey) what
window is it associated with? The window it was produced from? Maybe
the question is when is a window assigned to an element?

I'll see if I can come up with an example,

Thanks,
Dan.

On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <k...@apache.org> wrote:

Two pieces to this:

1. Every element in a PCollection is always associated with a window, and 
GroupByKey (hence CombinePerKey) operates per-key-and-window (w/ window 
merging).
2. If an element is not explicitly a KV, then there is no key associated with 
it.

I'm afraid I don't have any guesses at the problem based on what you've shared. 
Can you say more?

Kenn

On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <d...@debrunners.com> wrote:

The windowing section of the Beam programming model guide shows a
window defined and used in the GropyByKey transform after a ParDo.
(section 7.1.1).

However I couldn't find any documentation on how long the window
remains in scope for subsequent transforms.

I have an application with this pipeline:

PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
PCollection<R>

The idea is that the first window is aggregating by key but in the
second window I need to combine elements across all keys.

With my initial app I was seeing some runtime errors in/after the
combine where a KV<null,R> was being seen, even though at that point
there should be no key for the PCollection<R>.

In a simpler test I can apply  FixedWindow<X> -> Combine<X,R>.globally
-> PCollection<R> to a PCollection without an upstream window and the
combine correctly happens once.
But then adding the keyed upstream window, the combine occurs once per
key without any final combine across the keys.

So it seems somehow the memory of the key exists even with the new
window transform,

I'm probably misunderstanding some detail of windowing, but I couldn't
find any deeper documentation than the simple examples in the
programming model guide.

Can anyone point me in the correct direction?

Thanks,
Dan.



--

This email may be confidential and privileged. If you received this 
communication by mistake, please don't forward it to anyone else, please erase 
all copies and attachments, and please let me know that it has gone to the 
wrong person.

The above terms reflect a potential business arrangement, are provided solely 
as a basis for further discussion, and are not intended to be and do not 
constitute a legally binding obligation. No legally binding obligations will be 
created, implied, or inferred until an agreement in final form is executed in 
writing by all parties involved.

Reply via email to