I also think that we should improve our documentation. Multiply-fired triggers is an advanced and dangerous feature, and care needs to be taken that the downstream portions of the pipeline handle them correctly.
I filed https://issues.apache.org/jira/browse/BEAM-6716 On Wed, Feb 20, 2019 at 5:40 AM Kenneth Knowles <[email protected]> wrote: > > This is a very valid concern so I want to offer advice you can apply today. > > "We actually do track tentative vs final values already, but checking that at > write-time would impose a pretty big overhead in the write path." > > You mention you are writing to BigTable. If your key is conceptually the > tuple of (isFinal, actual key) then you can avoid read-modify-write, and at > query time (of your BigTable) you can choose to only look at final values. > There are probably variations of this so you can get "on-time" as another > category to fetch. > > One reason we have not taken on the large-scale re-architecting is that if > you control the whole pipeline it is usually possible to meet your business > need with some combination of accumulation mode, existing sinks, and behavior > of your downstream client. > > I also wanted to respond to this bit: > > >> I was reading this yesterday, but couldn't see how it solved the > >> out-of-order delivery problem here... > > > It moves the responsibility of doing things in the right order (and even > > defining what order is "correct enough") to the runner (and sinks) such > > that the side effects happen in order... > > Again, emphasis on sinks. Many of the ideas there just apply to sinks in > general. For example, the sink that writes deltas and the sink that writes > whole elements are _different sinks_. The sink that treats retractions as > "delete" side effects and the sink that write a row representing the > retraction are _different sinks_. These are steps we could take in Beam today. > > Kenn > > > On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz <[email protected]> wrote: >> >> Thanks again for all the replies everyone. Just as a final follow up here, >> are there any concrete plans on addressing these issues I could start >> following? The sink trigger doc seems like a start, but also seems like >> just a starting point in a larger re-architecture of sinks. >> >> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <[email protected]> wrote: >>> >>> >>> >>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <[email protected]> wrote: >>>> >>>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <[email protected]> >>>> wrote: >>>>> >>>>> >>>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>>> >>>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> Thanks again for the answers so far! I really appreciate it. As for >>>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd >>>>>>> prefer to keep our writes fully idempotent for other reasons (ie no >>>>>>> read-modify-write). We actually do track tentative vs final values >>>>>>> already, but checking that at write-time would impose a pretty big >>>>>>> overhead in the write path. >>>>>>> >>>>>>> After this I actually instrumented one of my running pipelines to >>>>>>> detect these "time traveling" panes, and did see it occurring pretty >>>>>>> frequently, particularly when dataflow decides to scale up/down the >>>>>>> job, so that was interesting. >>>>>>> >>>>>>> From all this, it seems like using a stateful DoFn to prevent time >>>>>>> traveling panes from overwriting newer ones is the best solution for >>>>>>> now. >>>>>> >>>>>> >>>>>> Note that you can't "filter out" these time traveling panes, because at >>>>>> the next fusion break they might get re-ordered again. >>>>> >>>>> >>>>> Ack, in a general sense. To solve my specific problem my plan was to >>>>> ensure the final writer sink would be fused to this filter step (or even >>>>> build it directly into the DoFn itself that does the write), which would >>>>> work in my specific case (it seems like at least). >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> My last question / statement is just around general education and >>>>>>> documentation about this. I think the fact that PCollection are >>>>>>> unordered makes sense and is pretty intuitive, but fired panes being >>>>>>> delivered out-of-order seems very surprising. I'm curious how many >>>>>>> other pipelines exist that run into this (and produce incorrect >>>>>>> results!) but people are unaware of. Is there a way we can call this >>>>>>> behavior out? For example, many of the sample beam projects use early >>>>>>> firings, but there's never any mention that the output may be >>>>>>> out-of-order. >>>>>> >>>>>> >>>>>> +1 to improving the documentation here. Basically multiple firings >>>>>> become independent elements of the resulting PCollection, they don't >>>>>> retain any association/ordering. >>>>>> >>>>>> Multiply-triggered window are difficult to reason about (and not just in >>>>>> this case), https://s.apache.org/beam-sink-triggers is IMHO the right >>>>>> answer. >>>>> >>>>> >>>>> I was reading this yesterday, but couldn't see how it solved the >>>>> out-of-order delivery problem here. I do like the overall direction its >>>>> proposing though, from my work with triggers so far I have found them >>>>> very difficult to reason about (like you said). >>>> >>>> >>>> It moves the responsibility of doing things in the right order (and even >>>> defining what order is "correct enough") to the runner (and sinks) such >>>> that the side effects happen in order, even if all the processing did not. >>>> To be clear there's still a fair amount of design to make that doc into a >>>> workable system... >>> >>> >>> With or without sink triggers, transforms that write need to be >>> pane-index-aware. The outputs themselves may be out of order, but they have >>> sequence numbers on them, so sinks likely need to be made stateful so they >>> can be idempotent in the face of reordering. >>> >>> Kenn >>> >>>> >>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]> >>>>>>> wrote: >>>>>>>> >>>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying! >>>>>>>> > Time to go re-think how we do some of our writes w/ early firings >>>>>>>> > then. >>>>>>>> > >>>>>>>> > Are there any workarounds to make things happen in-order in >>>>>>>> > dataflow? eg if the sink gets fused to the output of the GBK >>>>>>>> > operation, will it always happen effectively in order (per key) even >>>>>>>> > though it's not a guarantee? >>>>>>>> >>>>>>>> If things get fused, yes. Note that sinks themselves sometimes have >>>>>>>> fusion barriers though. >>>>>>>> >>>>>>>> > I also guess I could keep track of the last pane index my sink has >>>>>>>> > seen, and ignore earlier ones (using state to keep track)? >>>>>>>> >>>>>>>> Yes, that would work. >>>>>>>> >>>>>>>> What kind of sink are you using? If it supports read-modify-write or >>>>>>>> some kind of transaction you may be able to mark early results as >>>>>>>> tentative (which would be useful anyway) and only overwrite tentative >>>>>>>> ones. >>>>>>>> >>>>>>>> >>>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw >>>>>>>> > <[email protected]> wrote: >>>>>>>> >> >>>>>>>> >> Correct, even within the same key there's no promise of event time >>>>>>>> >> ordering mapping of panes to real time ordering because the >>>>>>>> >> downstream operations may happen on a different machine. Multiply >>>>>>>> >> triggered windows add an element of non-determinism to the process. >>>>>>>> >> >>>>>>>> >> You're also correct that triggering with multiple panes requires >>>>>>>> >> lots of care, especially when it comes to operations with side >>>>>>>> >> effects (like sinks). Most safe is to only write the final pane to >>>>>>>> >> the sink, and handle early triggering in a different way. >>>>>>>> >> https://s.apache.org/beam-sink-triggers is a proposal to make this >>>>>>>> >> easier to reason about. >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> >>>>>>>> >> wrote: >>>>>>>> >>> >>>>>>>> >>> Also to clarify here (I re-read this and realized it could be >>>>>>>> >>> slightly unclear). My question is only about in-order delivery of >>>>>>>> >>> panes. ie: will pane P always be delivered before P+1. >>>>>>>> >>> >>>>>>>> >>> I realize the use of "in-order" before could be confusing, I don't >>>>>>>> >>> care about the ordering of the elements per-se, just the ordering >>>>>>>> >>> of the pane delivery. >>>>>>>> >>> >>>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, >>>>>>>> >>> P1, P2) for a key, a downstream PCollection could never see P0, >>>>>>>> >>> P2, P1. OR at least, the final firing is always guaranteed to be >>>>>>>> >>> delivered after all early-firings (eg we could have P0, P2, P1, >>>>>>>> >>> but then always PLast). >>>>>>>> >>> >>>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz >>>>>>>> >>> <[email protected]> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Are you also saying also that even in the first example (Source >>>>>>>> >>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events >>>>>>>> >>>> would be delivered in-order from the Combine -> Sink transforms? >>>>>>>> >>>> This seems like a pretty big "got-cha" for correctness if you >>>>>>>> >>>> ever use accumulating triggering. >>>>>>>> >>>> >>>>>>>> >>>> I'd also like to point out I'm not talking about a global >>>>>>>> >>>> ordering across the entire PCollection, I'm talking about within >>>>>>>> >>>> the same key after a GBK transform. >>>>>>>> >>>> >>>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw >>>>>>>> >>>> <[email protected]> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Due to the nature of distributed processing, order is not >>>>>>>> >>>>> preserved. You can, however, inspect the PaneInfo to determine >>>>>>>> >>>>> if an element was early, on-time, or late and act accordingly. >>>>>>>> >>>>> >>>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia >>>>>>>> >>>>> <[email protected]> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply >>>>>>>> >>>>>> a transformation that sort the elements and then dispatch them >>>>>>>> >>>>>> sorted out. >>>>>>>> >>>>>> >>>>>>>> >>>>>> Or uses the Sorter extension for this: >>>>>>>> >>>>>> >>>>>>>> >>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >>>>>>>> >>>>>> >>>>>>>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. >>>>>>>> >>>>>> 2019, 16:31: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how >>>>>>>> >>>>>>> windowing, triggering, and panes work together, and how to >>>>>>>> >>>>>>> ensure correctness throughout a pipeline. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks >>>>>>>> >>>>>>> like: >>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and >>>>>>>> >>>>>>> accumulating panes, this is pretty straight forward. However, >>>>>>>> >>>>>>> this can get more complicated if we add steps after the >>>>>>>> >>>>>>> CombineByKey, for instance (using the same windowing strategy): >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into >>>>>>>> >>>>>>> batches of N elements. I can do this with the built-in >>>>>>>> >>>>>>> GroupIntoBatches [1] transform, now my pipeline looks like: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> This leads to my main question: >>>>>>>> >>>>>>> Is ordering preserved somehow here? ie: is it possible that >>>>>>>> >>>>>>> the result from early firing F+1 now comes BEFORE the firing F >>>>>>>> >>>>>>> (because it was re-ordered in the GroupIntoBatches). This >>>>>>>> >>>>>>> would mean that the sink then gets F+1 before F, which means >>>>>>>> >>>>>>> my resulting store has incorrect data (possibly forever if F+1 >>>>>>>> >>>>>>> was the final firing). >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to >>>>>>>> >>>>>>> introduce my own ordering back in after GroupIntoBatches. GIB >>>>>>>> >>>>>>> is an example here, but I imagine this could happen with any >>>>>>>> >>>>>>> GBK type operation. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Am I thinking about this the correct way? Thanks! >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> [1] >>>>>>>> >>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
