I also think that we should improve our documentation. Multiply-fired
triggers is an advanced and dangerous feature, and care needs to be
taken that the downstream portions of the pipeline handle them
correctly.

I filed https://issues.apache.org/jira/browse/BEAM-6716

On Wed, Feb 20, 2019 at 5:40 AM Kenneth Knowles <[email protected]> wrote:
>
> This is a very valid concern so I want to offer advice you can apply today.
>
> "We actually do track tentative vs final values already, but checking that at 
> write-time would impose a pretty big overhead in the write path."
>
> You mention you are writing to BigTable. If your key is conceptually the 
> tuple of (isFinal, actual key) then you can avoid read-modify-write, and at 
> query time (of your BigTable) you can choose to only look at final values. 
> There are probably variations of this so you can get "on-time" as another 
> category to fetch.
>
> One reason we have not taken on the large-scale re-architecting is that if 
> you control the whole pipeline it is usually possible to meet your business 
> need with some combination of accumulation mode, existing sinks, and behavior 
> of your downstream client.
>
> I also wanted to respond to this bit:
>
> >> I was reading this yesterday, but couldn't see how it solved the 
> >> out-of-order delivery problem here...
>
> > It moves the responsibility of doing things in the right order (and even 
> > defining what order is "correct enough") to the runner (and sinks) such 
> > that the side effects happen in order...
>
> Again, emphasis on sinks. Many of the ideas there just apply to sinks in 
> general. For example, the sink that writes deltas and the sink that writes 
> whole elements are _different sinks_. The sink that treats retractions as 
> "delete" side effects and the sink that write a row representing the 
> retraction are _different sinks_. These are steps we could take in Beam today.
>
> Kenn
>
>
> On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz <[email protected]> wrote:
>>
>> Thanks again for all the replies everyone.  Just as a final follow up here, 
>> are there any concrete plans on addressing these issues I could start 
>> following?  The sink trigger doc seems like a start, but also seems like 
>> just a starting point in a larger re-architecture of sinks.
>>
>> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>
>>>
>>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <[email protected]> wrote:
>>>>
>>>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <[email protected]> 
>>>> wrote:
>>>>>
>>>>>
>>>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <[email protected]> 
>>>>> wrote:
>>>>>>
>>>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]> 
>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks again for the answers so far!  I really appreciate it.  As for 
>>>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd 
>>>>>>> prefer to keep our writes fully idempotent for other reasons (ie no 
>>>>>>> read-modify-write).  We actually do track tentative vs final values 
>>>>>>> already, but checking that at write-time would impose a pretty big 
>>>>>>> overhead in the write path.
>>>>>>>
>>>>>>> After this I actually instrumented one of my running pipelines to 
>>>>>>> detect these "time traveling" panes, and did see it occurring pretty 
>>>>>>> frequently, particularly when dataflow decides to scale up/down the 
>>>>>>> job, so that was interesting.
>>>>>>>
>>>>>>> From all this, it seems like using a stateful DoFn to prevent time 
>>>>>>> traveling panes from overwriting newer ones is the best solution for 
>>>>>>> now.
>>>>>>
>>>>>>
>>>>>> Note that you can't "filter out" these time traveling panes, because at 
>>>>>> the next fusion break they might get re-ordered again.
>>>>>
>>>>>
>>>>> Ack, in a general sense.  To solve my specific problem my plan was to 
>>>>> ensure the final writer sink would be fused to this filter step (or even 
>>>>> build it directly into the DoFn itself that does the write), which would 
>>>>> work in my specific case (it seems like at least).
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> My last question / statement is just around general education and 
>>>>>>> documentation about this.  I think the fact that PCollection are 
>>>>>>> unordered makes sense and is pretty intuitive, but fired panes being 
>>>>>>> delivered out-of-order seems very surprising.  I'm curious how many 
>>>>>>> other pipelines exist that run into this (and produce incorrect 
>>>>>>> results!) but people are unaware of.  Is there a way we can call this 
>>>>>>> behavior out?  For example, many of the sample beam projects use early 
>>>>>>> firings, but there's never any mention that the output may be 
>>>>>>> out-of-order.
>>>>>>
>>>>>>
>>>>>> +1 to improving the documentation here. Basically multiple firings 
>>>>>> become independent elements of the resulting PCollection, they don't 
>>>>>> retain any association/ordering.
>>>>>>
>>>>>> Multiply-triggered window are difficult to reason about (and not just in 
>>>>>> this case), https://s.apache.org/beam-sink-triggers is IMHO the right 
>>>>>> answer.
>>>>>
>>>>>
>>>>> I was reading this yesterday, but couldn't see how it solved the 
>>>>> out-of-order delivery problem here.  I do like the overall direction its 
>>>>> proposing though, from my work with triggers so far I have found them 
>>>>> very difficult to reason about (like you said).
>>>>
>>>>
>>>> It moves the responsibility of doing things in the right order (and even 
>>>> defining what order is "correct enough") to the runner (and sinks) such 
>>>> that the side effects happen in order, even if all the processing did not. 
>>>> To be clear there's still a fair amount of design to make that doc into a 
>>>> workable system...
>>>
>>>
>>> With or without sink triggers, transforms that write need to be 
>>> pane-index-aware. The outputs themselves may be out of order, but they have 
>>> sequence numbers on them, so sinks likely need to be made stateful so they 
>>> can be idempotent in the face of reordering.
>>>
>>> Kenn
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]> 
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!  
>>>>>>>> > Time to go re-think how we do some of our writes w/ early firings 
>>>>>>>> > then.
>>>>>>>> >
>>>>>>>> > Are there any workarounds to make things happen in-order in 
>>>>>>>> > dataflow?  eg if the sink gets fused to the output of the GBK 
>>>>>>>> > operation, will it always happen effectively in order (per key) even 
>>>>>>>> > though it's not a guarantee?
>>>>>>>>
>>>>>>>> If things get fused, yes. Note that sinks themselves sometimes have 
>>>>>>>> fusion barriers though.
>>>>>>>>
>>>>>>>> > I also guess I could keep track of the last pane index my sink has 
>>>>>>>> > seen, and ignore earlier ones (using state to keep track)?
>>>>>>>>
>>>>>>>> Yes, that would work.
>>>>>>>>
>>>>>>>> What kind of sink are you using? If it supports read-modify-write or 
>>>>>>>> some kind of transaction you may be able to mark early results as 
>>>>>>>> tentative (which would be useful anyway) and only overwrite tentative 
>>>>>>>> ones.
>>>>>>>>
>>>>>>>>
>>>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw 
>>>>>>>> > <[email protected]> wrote:
>>>>>>>> >>
>>>>>>>> >> Correct, even within the same key there's no promise of event time 
>>>>>>>> >> ordering mapping of panes to real time ordering because the 
>>>>>>>> >> downstream operations may happen on a different machine. Multiply 
>>>>>>>> >> triggered windows add an element of non-determinism to the process.
>>>>>>>> >>
>>>>>>>> >> You're also correct that triggering with multiple panes requires 
>>>>>>>> >> lots of care, especially when it comes to operations with side 
>>>>>>>> >> effects (like sinks). Most safe is to only write the final pane to 
>>>>>>>> >> the sink, and handle early triggering in a different way. 
>>>>>>>> >> https://s.apache.org/beam-sink-triggers is a proposal to make this 
>>>>>>>> >> easier to reason about.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> 
>>>>>>>> >> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Also to clarify here (I re-read this and realized it could be 
>>>>>>>> >>> slightly unclear).  My question is only about in-order delivery of 
>>>>>>>> >>> panes.  ie: will pane P always be delivered before P+1.
>>>>>>>> >>>
>>>>>>>> >>> I realize the use of "in-order" before could be confusing, I don't 
>>>>>>>> >>> care about the ordering of the elements per-se, just the ordering 
>>>>>>>> >>> of the pane delivery.
>>>>>>>> >>>
>>>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, 
>>>>>>>> >>> P1, P2) for a key, a downstream PCollection could never see P0, 
>>>>>>>> >>> P2, P1.  OR at least, the final firing is always guaranteed to be 
>>>>>>>> >>> delivered after all early-firings (eg we could have P0, P2, P1, 
>>>>>>>> >>> but then always PLast).
>>>>>>>> >>>
>>>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz 
>>>>>>>> >>> <[email protected]> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Are you also saying also that even in the first example (Source 
>>>>>>>> >>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events 
>>>>>>>> >>>> would be delivered in-order from the Combine -> Sink transforms?  
>>>>>>>> >>>> This seems like a pretty big "got-cha" for correctness if you 
>>>>>>>> >>>> ever use accumulating triggering.
>>>>>>>> >>>>
>>>>>>>> >>>> I'd also like to point out I'm not talking about a global 
>>>>>>>> >>>> ordering across the entire PCollection, I'm talking about within 
>>>>>>>> >>>> the same key after a GBK transform.
>>>>>>>> >>>>
>>>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
>>>>>>>> >>>> <[email protected]> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> Due to the nature of distributed processing, order is not 
>>>>>>>> >>>>> preserved. You can, however, inspect the PaneInfo to determine 
>>>>>>>> >>>>> if an element was early, on-time, or late and act accordingly.
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia 
>>>>>>>> >>>>> <[email protected]> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply 
>>>>>>>> >>>>>> a transformation that sort the elements and then dispatch them 
>>>>>>>> >>>>>> sorted out.
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 
>>>>>>>> >>>>>> 2019, 16:31:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how 
>>>>>>>> >>>>>>> windowing, triggering, and panes work together, and how to 
>>>>>>>> >>>>>>> ensure correctness throughout a pipeline.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks 
>>>>>>>> >>>>>>> like:
>>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and 
>>>>>>>> >>>>>>> accumulating panes, this is pretty straight forward.  However, 
>>>>>>>> >>>>>>> this can get more complicated if we add steps after the 
>>>>>>>> >>>>>>> CombineByKey, for instance (using the same windowing strategy):
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into 
>>>>>>>> >>>>>>> batches of N elements.  I can do this with the built-in 
>>>>>>>> >>>>>>> GroupIntoBatches [1] transform, now my pipeline looks like:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> This leads to my main question:
>>>>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that 
>>>>>>>> >>>>>>> the result from early firing F+1 now comes BEFORE the firing F 
>>>>>>>> >>>>>>> (because it was re-ordered in the GroupIntoBatches).  This 
>>>>>>>> >>>>>>> would mean that the sink then gets F+1 before F, which means 
>>>>>>>> >>>>>>> my resulting store has incorrect data (possibly forever if F+1 
>>>>>>>> >>>>>>> was the final firing).
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to 
>>>>>>>> >>>>>>> introduce my own ordering back in after GroupIntoBatches.  GIB 
>>>>>>>> >>>>>>> is an example here, but I imagine this could happen with any 
>>>>>>>> >>>>>>> GBK type operation.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> [1] 
>>>>>>>> >>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Reply via email to