Windows are assigned to elements via the Window.into transform. They
influence grouping operations such as GroupByKey, Combine.perKey, and
Combine.globally. Looking at your example, you start with
PCollection<KV<A,B>>
Presumably via a Read or a Create. These KVs are in a global window,
so the elements are really triples (ignoring PaneInfo) of the form
(KV<A, B>, GlobalWindow, timestamp)
From what I gather, the next step you do is a
Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
whose elements are, implicitly
(KV<A, B>, IntervalWindow, timestamp)
Now you apply a GroupByKey to get elements of the form
(KV<A, Iterable<B>>, IntervalWindow, timestamp)
where there is one Iterable for each distinct key and window. You
apply a ParDo to get PCollection<X> which is of the form
(X, IntervalWindow, timestamp)
It looks like your next step is another
Window.into(FixedWindows.of(...)), yielding
(X, IntervalWindow, timestamp)
where the IntervalWindow here may be different if the parameters to
FixedWindows were different (e.g. the first was by minute, the second
by hours). If it's the same, this is a no-op. Now you apply
Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
elements are of the form
(R, IntervalWindow, timestamp)
where there is now one R per window (the elements in the same window
being combined, the elements across windows not).
FWIW, internally, Combine.globally is implemented as PariWithNullKey +
CombinePerKey + StripNullKey.
Does this help?
On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> wrote:
Thanks for the reply.
As for every element is always associated with a window, when a
element is produced due to a window trigger (e.g. the GroupByKey) what
window is it associated with? The window it was produced from? Maybe
the question is when is a window assigned to an element?
I'll see if I can come up with an example,
Thanks,
Dan.
On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> wrote:
Two pieces to this:
1. Every element in a PCollection is always associated with a window, and
GroupByKey (hence CombinePerKey) operates per-key-and-window (w/ window
merging).
2. If an element is not explicitly a KV, then there is no key associated with
it.
I'm afraid I don't have any guesses at the problem based on what you've shared.
Can you say more?
Kenn
On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <[email protected]> wrote:
The windowing section of the Beam programming model guide shows a
window defined and used in the GropyByKey transform after a ParDo.
(section 7.1.1).
However I couldn't find any documentation on how long the window
remains in scope for subsequent transforms.
I have an application with this pipeline:
PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
PCollection<R>
The idea is that the first window is aggregating by key but in the
second window I need to combine elements across all keys.
With my initial app I was seeing some runtime errors in/after the
combine where a KV<null,R> was being seen, even though at that point
there should be no key for the PCollection<R>.
In a simpler test I can apply FixedWindow<X> -> Combine<X,R>.globally
-> PCollection<R> to a PCollection without an upstream window and the
combine correctly happens once.
But then adding the keyed upstream window, the combine occurs once per
key without any final combine across the keys.
So it seems somehow the memory of the key exists even with the new
window transform,
I'm probably misunderstanding some detail of windowing, but I couldn't
find any deeper documentation than the simple examples in the
programming model guide.
Can anyone point me in the correct direction?
Thanks,
Dan.