Windows are assigned to elements via the Window.into transform. They
influence grouping operations such as GroupByKey, Combine.perKey, and
Combine.globally. Looking at your example, you start with

    PCollection<KV<A,B>>

Presumably via a Read or a Create. These KVs are in a global window,
so the elements are really triples (ignoring PaneInfo) of the form

    (KV<A, B>, GlobalWindow, timestamp)

>From what I gather, the next step you do is a
Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
whose elements are, implicitly

    (KV<A, B>, IntervalWindow, timestamp)

Now you apply a GroupByKey to get elements of the form

    (KV<A, Iterable<B>>, IntervalWindow, timestamp)

where there is one Iterable for each distinct key and window. You
apply a ParDo to get PCollection<X> which is of the form

    (X, IntervalWindow, timestamp)

It looks like your next step is another
Window.into(FixedWindows.of(...)), yielding

    (X, IntervalWindow, timestamp)

where the IntervalWindow here may be different if the parameters to
FixedWindows were different (e.g. the first was by minute, the second
by hours). If it's the same, this is a no-op. Now you apply
Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
elements are of the form

    (R, IntervalWindow, timestamp)

where there is now one R per window (the elements in the same window
being combined, the elements across windows not).

FWIW, internally, Combine.globally is implemented as PariWithNullKey +
CombinePerKey + StripNullKey.

Does this help?


On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> wrote:
>
> Thanks for the reply.
>
> As for every element is always associated with a window, when a
> element is produced due to a window trigger (e.g. the GroupByKey) what
> window is it associated with? The window it was produced from? Maybe
> the question is when is a window assigned to an element?
>
> I'll see if I can come up with an example,
>
> Thanks,
> Dan.
>
> On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> wrote:
> >
> > Two pieces to this:
> >
> > 1. Every element in a PCollection is always associated with a window, and 
> > GroupByKey (hence CombinePerKey) operates per-key-and-window (w/ window 
> > merging).
> > 2. If an element is not explicitly a KV, then there is no key associated 
> > with it.
> >
> > I'm afraid I don't have any guesses at the problem based on what you've 
> > shared. Can you say more?
> >
> > Kenn
> >
> > On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <[email protected]> 
> > wrote:
> >>
> >> The windowing section of the Beam programming model guide shows a
> >> window defined and used in the GropyByKey transform after a ParDo.
> >> (section 7.1.1).
> >>
> >> However I couldn't find any documentation on how long the window
> >> remains in scope for subsequent transforms.
> >>
> >> I have an application with this pipeline:
> >>
> >> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
> >> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
> >> PCollection<R>
> >>
> >> The idea is that the first window is aggregating by key but in the
> >> second window I need to combine elements across all keys.
> >>
> >> With my initial app I was seeing some runtime errors in/after the
> >> combine where a KV<null,R> was being seen, even though at that point
> >> there should be no key for the PCollection<R>.
> >>
> >> In a simpler test I can apply  FixedWindow<X> -> Combine<X,R>.globally
> >> -> PCollection<R> to a PCollection without an upstream window and the
> >> combine correctly happens once.
> >> But then adding the keyed upstream window, the combine occurs once per
> >> key without any final combine across the keys.
> >>
> >> So it seems somehow the memory of the key exists even with the new
> >> window transform,
> >>
> >> I'm probably misunderstanding some detail of windowing, but I couldn't
> >> find any deeper documentation than the simple examples in the
> >> programming model guide.
> >>
> >> Can anyone point me in the correct direction?
> >>
> >> Thanks,
> >> Dan.

Reply via email to