Are you also saying also that even in the first example (Source -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be delivered in-order from the Combine -> Sink transforms? This seems like a pretty big "got-cha" for correctness if you ever use accumulating triggering.
I'd also like to point out I'm not talking about a global ordering across the entire PCollection, I'm talking about within the same key after a GBK transform. On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]> wrote: > Due to the nature of distributed processing, order is not preserved. You > can, however, inspect the PaneInfo to determine if an element was early, > on-time, or late and act accordingly. > > On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]> > wrote: > >> In my experience ordering is not guaranteed, you may need apply a >> transformation that sort the elements and then dispatch them sorted out. >> >> Or uses the Sorter extension for this: >> >> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >> >> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019, 16:31: >> >>> Hi everyone, I have some questions I want to ask about how windowing, >>> triggering, and panes work together, and how to ensure correctness >>> throughout a pipeline. >>> >>> Lets assume I have a very simple streaming pipeline that looks like: >>> Source -> CombineByKey (Sum) -> Sink >>> >>> Given fixed windows of 1 hour, early firings every minute, and >>> accumulating panes, this is pretty straight forward. However, this can get >>> more complicated if we add steps after the CombineByKey, for instance >>> (using the same windowing strategy): >>> >>> Say I want to buffer the results of the CombineByKey into batches of N >>> elements. I can do this with the built-in GroupIntoBatches [1] transform, >>> now my pipeline looks like: >>> >>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >>> >>> *This leads to my main question:* >>> Is ordering preserved somehow here? ie: is it possible that the result >>> from early firing F+1 now comes BEFORE the firing F (because it was >>> re-ordered in the GroupIntoBatches). This would mean that the sink then >>> gets F+1 before F, which means my resulting store has incorrect data >>> (possibly forever if F+1 was the final firing). >>> >>> If ordering is not preserved, it seems as if I'd need to introduce my >>> own ordering back in after GroupIntoBatches. GIB is an example here, but I >>> imagine this could happen with any GBK type operation. >>> >>> Am I thinking about this the correct way? Thanks! >>> >>> [1] >>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html >>> >>
