wow, thats super unexpected and dangerous, thanks for clarifying! Time to go re-think how we do some of our writes w/ early firings then.
Are there any workarounds to make things happen in-order in dataflow? eg if the sink gets fused to the output of the GBK operation, will it always happen effectively in order (per key) even though it's not a guarantee? I also guess I could keep track of the last pane index my sink has seen, and ignore earlier ones (using state to keep track)? On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <[email protected]> wrote: > Correct, even within the same key there's no promise of event time > ordering mapping of panes to real time ordering because the downstream > operations *may* happen on a different machine. Multiply triggered > windows add an element of non-determinism to the process. > > You're also correct that triggering with multiple panes requires lots of > care, especially when it comes to operations with side effects (like > sinks). Most safe is to only write the final pane to the sink, and handle > early triggering in a different way. > https://s.apache.org/beam-sink-triggers is a proposal to make this easier > to reason about. > > > On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> wrote: > >> Also to clarify here (I re-read this and realized it could be slightly >> unclear). My question is only about in-order delivery of panes. ie: will >> pane P always be delivered before P+1. >> >> I realize the use of "in-order" before could be confusing, I don't care >> about the ordering of the elements per-se, just the ordering of the pane >> delivery. >> >> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) >> for a key, a downstream PCollection could never see P0, P2, P1. OR at >> least, the final firing is always guaranteed to be delivered after all >> early-firings (eg we could have P0, P2, P1, but then always PLast). >> >> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]> >> wrote: >> >>> Are you also saying also that even in the first example (Source -> >>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be >>> delivered in-order from the Combine -> Sink transforms? This seems like a >>> pretty big "got-cha" for correctness if you ever use accumulating >>> triggering. >>> >>> I'd also like to point out I'm not talking about a global ordering >>> across the entire PCollection, I'm talking about within the same key after >>> a GBK transform. >>> >>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Due to the nature of distributed processing, order is not preserved. >>>> You can, however, inspect the PaneInfo to determine if an element was >>>> early, on-time, or late and act accordingly. >>>> >>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]> >>>> wrote: >>>> >>>>> In my experience ordering is not guaranteed, you may need apply a >>>>> transformation that sort the elements and then dispatch them sorted out. >>>>> >>>>> Or uses the Sorter extension for this: >>>>> >>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >>>>> >>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019, >>>>> 16:31: >>>>> >>>>>> Hi everyone, I have some questions I want to ask about how windowing, >>>>>> triggering, and panes work together, and how to ensure correctness >>>>>> throughout a pipeline. >>>>>> >>>>>> Lets assume I have a very simple streaming pipeline that looks like: >>>>>> Source -> CombineByKey (Sum) -> Sink >>>>>> >>>>>> Given fixed windows of 1 hour, early firings every minute, and >>>>>> accumulating panes, this is pretty straight forward. However, this can >>>>>> get >>>>>> more complicated if we add steps after the CombineByKey, for instance >>>>>> (using the same windowing strategy): >>>>>> >>>>>> Say I want to buffer the results of the CombineByKey into batches of >>>>>> N elements. I can do this with the built-in GroupIntoBatches [1] >>>>>> transform, now my pipeline looks like: >>>>>> >>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >>>>>> >>>>>> *This leads to my main question:* >>>>>> Is ordering preserved somehow here? ie: is it possible that the >>>>>> result from early firing F+1 now comes BEFORE the firing F (because it >>>>>> was >>>>>> re-ordered in the GroupIntoBatches). This would mean that the sink then >>>>>> gets F+1 before F, which means my resulting store has incorrect data >>>>>> (possibly forever if F+1 was the final firing). >>>>>> >>>>>> If ordering is not preserved, it seems as if I'd need to introduce my >>>>>> own ordering back in after GroupIntoBatches. GIB is an example here, >>>>>> but I >>>>>> imagine this could happen with any GBK type operation. >>>>>> >>>>>> Am I thinking about this the correct way? Thanks! >>>>>> >>>>>> [1] >>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html >>>>>> >>>>>
