On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]> wrote:
> Thanks again for the answers so far! I really appreciate it. As for my > specific use-case, we're using Bigtable as the final sink, and I'd prefer > to keep our writes fully idempotent for other reasons (ie no > read-modify-write). We actually do track tentative vs final values > already, but checking that at write-time would impose a pretty big overhead > in the write path. > > After this I actually instrumented one of my running pipelines to detect > these "time traveling" panes, and did see it occurring pretty frequently, > particularly when dataflow decides to scale up/down the job, so that was > interesting. > > From all this, it seems like using a stateful DoFn to prevent time > traveling panes from overwriting newer ones is the best solution for now. > Note that you can't "filter out" these time traveling panes, because at the next fusion break they might get re-ordered again. > My last question / statement is just around general education and > documentation about this. I think the fact that PCollection are unordered > makes sense and is pretty intuitive, but fired panes being delivered > out-of-order seems very surprising. I'm curious how many other pipelines > exist that run into this (and produce incorrect results!) but people are > unaware of. Is there a way we can call this behavior out? For example, > many of the sample beam projects use early firings, but there's never any > mention that the output may be out-of-order. > +1 to improving the documentation here. Basically multiple firings become independent elements of the resulting PCollection, they don't retain any association/ordering. Multiply-triggered window are difficult to reason about (and not just in this case), https://s.apache.org/beam-sink-triggers is IMHO the right answer. > On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]> > wrote: > >> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]> >> wrote: >> > >> > wow, thats super unexpected and dangerous, thanks for clarifying! Time >> to go re-think how we do some of our writes w/ early firings then. >> > >> > Are there any workarounds to make things happen in-order in dataflow? >> eg if the sink gets fused to the output of the GBK operation, will it >> always happen effectively in order (per key) even though it's not a >> guarantee? >> >> If things get fused, yes. Note that sinks themselves sometimes have >> fusion barriers though. >> >> > I also guess I could keep track of the last pane index my sink has >> seen, and ignore earlier ones (using state to keep track)? >> >> Yes, that would work. >> >> What kind of sink are you using? If it supports read-modify-write or some >> kind of transaction you may be able to mark early results as tentative >> (which would be useful anyway) and only overwrite tentative ones. >> >> >> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <[email protected]> >> wrote: >> >> >> >> Correct, even within the same key there's no promise of event time >> ordering mapping of panes to real time ordering because the downstream >> operations may happen on a different machine. Multiply triggered windows >> add an element of non-determinism to the process. >> >> >> >> You're also correct that triggering with multiple panes requires lots >> of care, especially when it comes to operations with side effects (like >> sinks). Most safe is to only write the final pane to the sink, and handle >> early triggering in a different way. >> https://s.apache.org/beam-sink-triggers is a proposal to make this >> easier to reason about. >> >> >> >> >> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> >> wrote: >> >>> >> >>> Also to clarify here (I re-read this and realized it could be >> slightly unclear). My question is only about in-order delivery of panes. >> ie: will pane P always be delivered before P+1. >> >>> >> >>> I realize the use of "in-order" before could be confusing, I don't >> care about the ordering of the elements per-se, just the ordering of the >> pane delivery. >> >>> >> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1, >> P2) for a key, a downstream PCollection could never see P0, P2, P1. OR at >> least, the final firing is always guaranteed to be delivered after all >> early-firings (eg we could have P0, P2, P1, but then always PLast). >> >>> >> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]> >> wrote: >> >>>> >> >>>> Are you also saying also that even in the first example (Source -> >> CombineByKey (Sum) -> Sink) there's no guarantee that events would be >> delivered in-order from the Combine -> Sink transforms? This seems like a >> pretty big "got-cha" for correctness if you ever use accumulating >> triggering. >> >>>> >> >>>> I'd also like to point out I'm not talking about a global ordering >> across the entire PCollection, I'm talking about within the same key after >> a GBK transform. >> >>>> >> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw < >> [email protected]> wrote: >> >>>>> >> >>>>> Due to the nature of distributed processing, order is not >> preserved. You can, however, inspect the PaneInfo to determine if an >> element was early, on-time, or late and act accordingly. >> >>>>> >> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia < >> [email protected]> wrote: >> >>>>>> >> >>>>>> In my experience ordering is not guaranteed, you may need apply a >> transformation that sort the elements and then dispatch them sorted out. >> >>>>>> >> >>>>>> Or uses the Sorter extension for this: >> >>>>>> >> >>>>>> >> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >> >>>>>> >> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. >> 2019, 16:31: >> >>>>>>> >> >>>>>>> Hi everyone, I have some questions I want to ask about how >> windowing, triggering, and panes work together, and how to ensure >> correctness throughout a pipeline. >> >>>>>>> >> >>>>>>> Lets assume I have a very simple streaming pipeline that looks >> like: >> >>>>>>> Source -> CombineByKey (Sum) -> Sink >> >>>>>>> >> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and >> accumulating panes, this is pretty straight forward. However, this can get >> more complicated if we add steps after the CombineByKey, for instance >> (using the same windowing strategy): >> >>>>>>> >> >>>>>>> Say I want to buffer the results of the CombineByKey into batches >> of N elements. I can do this with the built-in GroupIntoBatches [1] >> transform, now my pipeline looks like: >> >>>>>>> >> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >> >>>>>>> >> >>>>>>> This leads to my main question: >> >>>>>>> Is ordering preserved somehow here? ie: is it possible that the >> result from early firing F+1 now comes BEFORE the firing F (because it was >> re-ordered in the GroupIntoBatches). This would mean that the sink then >> gets F+1 before F, which means my resulting store has incorrect data >> (possibly forever if F+1 was the final firing). >> >>>>>>> >> >>>>>>> If ordering is not preserved, it seems as if I'd need to >> introduce my own ordering back in after GroupIntoBatches. GIB is an >> example here, but I imagine this could happen with any GBK type operation. >> >>>>>>> >> >>>>>>> Am I thinking about this the correct way? Thanks! >> >>>>>>> >> >>>>>>> [1] >> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html >> >>>
