Thanks Robert, your description is what I'm expecting, I'm working on a simple example to see if what I'm seeing is different and then hopefully use that to clarify my misunderstanding.
Thanks, Dan. On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <[email protected]> wrote: > > Windows are assigned to elements via the Window.into transform. They > influence grouping operations such as GroupByKey, Combine.perKey, and > Combine.globally. Looking at your example, you start with > > PCollection<KV<A,B>> > > Presumably via a Read or a Create. These KVs are in a global window, > so the elements are really triples (ignoring PaneInfo) of the form > > (KV<A, B>, GlobalWindow, timestamp) > > From what I gather, the next step you do is a > Window.into(FixedWindows.of(...)), yielding a PCollection<KV<A,B>> > whose elements are, implicitly > > (KV<A, B>, IntervalWindow, timestamp) > > Now you apply a GroupByKey to get elements of the form > > (KV<A, Iterable<B>>, IntervalWindow, timestamp) > > where there is one Iterable for each distinct key and window. You > apply a ParDo to get PCollection<X> which is of the form > > (X, IntervalWindow, timestamp) > > It looks like your next step is another > Window.into(FixedWindows.of(...)), yielding > > (X, IntervalWindow, timestamp) > > where the IntervalWindow here may be different if the parameters to > FixedWindows were different (e.g. the first was by minute, the second > by hours). If it's the same, this is a no-op. Now you apply > Combine.globally(CombineFn<X, R>) to get a PCollection<R> whose > elements are of the form > > (R, IntervalWindow, timestamp) > > where there is now one R per window (the elements in the same window > being combined, the elements across windows not). > > FWIW, internally, Combine.globally is implemented as PariWithNullKey + > CombinePerKey + StripNullKey. > > Does this help? > > > On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <[email protected]> wrote: > > > > Thanks for the reply. > > > > As for every element is always associated with a window, when a > > element is produced due to a window trigger (e.g. the GroupByKey) what > > window is it associated with? The window it was produced from? Maybe > > the question is when is a window assigned to an element? > > > > I'll see if I can come up with an example, > > > > Thanks, > > Dan. > > > > On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <[email protected]> wrote: > > > > > > Two pieces to this: > > > > > > 1. Every element in a PCollection is always associated with a window, and > > > GroupByKey (hence CombinePerKey) operates per-key-and-window (w/ window > > > merging). > > > 2. If an element is not explicitly a KV, then there is no key associated > > > with it. > > > > > > I'm afraid I don't have any guesses at the problem based on what you've > > > shared. Can you say more? > > > > > > Kenn > > > > > > On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <[email protected]> > > > wrote: > > >> > > >> The windowing section of the Beam programming model guide shows a > > >> window defined and used in the GropyByKey transform after a ParDo. > > >> (section 7.1.1). > > >> > > >> However I couldn't find any documentation on how long the window > > >> remains in scope for subsequent transforms. > > >> > > >> I have an application with this pipeline: > > >> > > >> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey -> > > >> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally -> > > >> PCollection<R> > > >> > > >> The idea is that the first window is aggregating by key but in the > > >> second window I need to combine elements across all keys. > > >> > > >> With my initial app I was seeing some runtime errors in/after the > > >> combine where a KV<null,R> was being seen, even though at that point > > >> there should be no key for the PCollection<R>. > > >> > > >> In a simpler test I can apply FixedWindow<X> -> Combine<X,R>.globally > > >> -> PCollection<R> to a PCollection without an upstream window and the > > >> combine correctly happens once. > > >> But then adding the keyed upstream window, the combine occurs once per > > >> key without any final combine across the keys. > > >> > > >> So it seems somehow the memory of the key exists even with the new > > >> window transform, > > >> > > >> I'm probably misunderstanding some detail of windowing, but I couldn't > > >> find any deeper documentation than the simple examples in the > > >> programming model guide. > > >> > > >> Can anyone point me in the correct direction? > > >> > > >> Thanks, > > >> Dan.
