Thanks Robert, your description is what I'm expecting, I'm working on
a simple example to see if what I'm seeing is different and then
hopefully use that to clarify my misunderstanding.

Thanks,
Dan.

On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]> wrote:
>
> Windows are assigned to elements via the Window.into transform. They
> influence grouping operations such as GroupByKey, Combine.perKey, and
> Combine.globally. Looking at your example, you start with
>
>     PCollection<KV<A,B>>
>
> Presumably via a Read or a Create. These KVs are in a global window,
> so the elements are really triples (ignoring PaneInfo) of the form
>
>     (KV<A, B>, GlobalWindow, timestamp)
>
> From what I gather, the next step you do is a
> Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>>
> whose elements are, implicitly
>
>     (KV<A, B>, IntervalWindow, timestamp)
>
> Now you apply a GroupByKey to get elements of the form
>
>     (KV<A, Iterable<B>>, IntervalWindow, timestamp)
>
> where there is one Iterable for each distinct key and window. You
> apply a ParDo to get PCollection<X> which is of the form
>
>     (X, IntervalWindow, timestamp)
>
> It looks like your next step is another
> Window.into(FixedWindows.of(...)), yielding
>
>     (X, IntervalWindow, timestamp)
>
> where the IntervalWindow here may be different if the parameters to
> FixedWindows were different (e.g. the first was by minute, the second
> by hours). If it's the same, this is a no-op. Now you apply
> Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose
> elements are of the form
>
>     (R, IntervalWindow, timestamp)
>
> where there is now one R per window (the elements in the same window
> being combined, the elements across windows not).
>
> FWIW, internally, Combine.globally is implemented as PariWithNullKey +
> CombinePerKey + StripNullKey.
>
> Does this help?
>
>
> On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> wrote:
> >
> > Thanks for the reply.
> >
> > As for every element is always associated with a window, when a
> > element is produced due to a window trigger (e.g. the GroupByKey) what
> > window is it associated with? The window it was produced from? Maybe
> > the question is when is a window assigned to an element?
> >
> > I'll see if I can come up with an example,
> >
> > Thanks,
> > Dan.
> >
> > On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> wrote:
> > >
> > > Two pieces to this:
> > >
> > > 1. Every element in a PCollection is always associated with a window, and 
> > > GroupByKey (hence CombinePerKey) operates per-key-and-window (w/ window 
> > > merging).
> > > 2. If an element is not explicitly a KV, then there is no key associated 
> > > with it.
> > >
> > > I'm afraid I don't have any guesses at the problem based on what you've 
> > > shared. Can you say more?
> > >
> > > Kenn
> > >
> > > On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <[email protected]> 
> > > wrote:
> > >>
> > >> The windowing section of the Beam programming model guide shows a
> > >> window defined and used in the GropyByKey transform after a ParDo.
> > >> (section 7.1.1).
> > >>
> > >> However I couldn't find any documentation on how long the window
> > >> remains in scope for subsequent transforms.
> > >>
> > >> I have an application with this pipeline:
> > >>
> > >> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey ->
> > >> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally ->
> > >> PCollection<R>
> > >>
> > >> The idea is that the first window is aggregating by key but in the
> > >> second window I need to combine elements across all keys.
> > >>
> > >> With my initial app I was seeing some runtime errors in/after the
> > >> combine where a KV<null,R> was being seen, even though at that point
> > >> there should be no key for the PCollection<R>.
> > >>
> > >> In a simpler test I can apply  FixedWindow<X> -> Combine<X,R>.globally
> > >> -> PCollection<R> to a PCollection without an upstream window and the
> > >> combine correctly happens once.
> > >> But then adding the keyed upstream window, the combine occurs once per
> > >> key without any final combine across the keys.
> > >>
> > >> So it seems somehow the memory of the key exists even with the new
> > >> window transform,
> > >>
> > >> I'm probably misunderstanding some detail of windowing, but I couldn't
> > >> find any deeper documentation than the simple examples in the
> > >> programming model guide.
> > >>
> > >> Can anyone point me in the correct direction?
> > >>
> > >> Thanks,
> > >> Dan.

Reply via email to