Thanks again for the answers so far!  I really appreciate it.  As for my
specific use-case, we're using Bigtable as the final sink, and I'd prefer
to keep our writes fully idempotent for other reasons (ie no
read-modify-write).  We actually do track tentative vs final values
already, but checking that at write-time would impose a pretty big overhead
in the write path.

After this I actually instrumented one of my running pipelines to detect
these "time traveling" panes, and did see it occurring pretty frequently,
particularly when dataflow decides to scale up/down the job, so that was
interesting.

>From all this, it seems like using a stateful DoFn to prevent time
traveling panes from overwriting newer ones is the best solution for now.

My last question / statement is just around general education and
documentation about this.  I think the fact that PCollection are unordered
makes sense and is pretty intuitive, but fired panes being delivered
out-of-order seems very surprising.  I'm curious how many other pipelines
exist that run into this (and produce incorrect results!) but people are
unaware of.  Is there a way we can call this behavior out?  For example,
many of the sample beam projects use early firings, but there's never any
mention that the output may be out-of-order.

On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <rober...@google.com> wrote:

> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sniem...@twitter.com>
> wrote:
> >
> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
> to go re-think how we do some of our writes w/ early firings then.
> >
> > Are there any workarounds to make things happen in-order in dataflow?
>  eg if the sink gets fused to the output of the GBK operation, will it
> always happen effectively in order (per key) even though it's not a
> guarantee?
>
> If things get fused, yes. Note that sinks themselves sometimes have fusion
> barriers though.
>
> > I also guess I could keep track of the last pane index my sink has seen,
> and ignore earlier ones (using state to keep track)?
>
> Yes, that would work.
>
> What kind of sink are you using? If it supports read-modify-write or some
> kind of transaction you may be able to mark early results as tentative
> (which would be useful anyway) and only overwrite tentative ones.
>
>
> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations may happen on a different machine. Multiply triggered windows
> add an element of non-determinism to the process.
> >>
> >> You're also correct that triggering with multiple panes requires lots
> of care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
> >>
> >>
> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sniem...@apache.org>
> wrote:
> >>>
> >>> Also to clarify here (I re-read this and realized it could be slightly
> unclear).  My question is only about in-order delivery of panes.  ie: will
> pane P always be delivered before P+1.
> >>>
> >>> I realize the use of "in-order" before could be confusing, I don't
> care about the ordering of the elements per-se, just the ordering of the
> pane delivery.
> >>>
> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
> least, the final firing is always guaranteed to be delivered after all
> early-firings (eg we could have P0, P2, P1, but then always PLast).
> >>>
> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sniem...@apache.org>
> wrote:
> >>>>
> >>>> Are you also saying also that even in the first example (Source ->
> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
> delivered in-order from the Combine -> Sink transforms?  This seems like a
> pretty big "got-cha" for correctness if you ever use accumulating
> triggering.
> >>>>
> >>>> I'd also like to point out I'm not talking about a global ordering
> across the entire PCollection, I'm talking about within the same key after
> a GBK transform.
> >>>>
> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>>>
> >>>>> Due to the nature of distributed processing, order is not preserved.
> You can, however, inspect the PaneInfo to determine if an element was
> early, on-time, or late and act accordingly.
> >>>>>
> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
> jcgarc...@gmail.com> wrote:
> >>>>>>
> >>>>>> In my experience ordering is not guaranteed, you may need apply a
> transformation that sort the elements and then dispatch them sorted out.
> >>>>>>
> >>>>>> Or uses the Sorter extension for this:
> >>>>>>
> >>>>>>
> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
> >>>>>>
> >>>>>> Steve Niemitz <sniem...@apache.org> schrieb am Di., 12. Feb. 2019,
> 16:31:
> >>>>>>>
> >>>>>>> Hi everyone, I have some questions I want to ask about how
> windowing, triggering, and panes work together, and how to ensure
> correctness throughout a pipeline.
> >>>>>>>
> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
> like:
> >>>>>>> Source -> CombineByKey (Sum) -> Sink
> >>>>>>>
> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
> accumulating panes, this is pretty straight forward.  However, this can get
> more complicated if we add steps after the CombineByKey, for instance
> (using the same windowing strategy):
> >>>>>>>
> >>>>>>> Say I want to buffer the results of the CombineByKey into batches
> of N elements.  I can do this with the built-in GroupIntoBatches [1]
> transform, now my pipeline looks like:
> >>>>>>>
> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
> >>>>>>>
> >>>>>>> This leads to my main question:
> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
> result from early firing F+1 now comes BEFORE the firing F (because it was
> re-ordered in the GroupIntoBatches).  This would mean that the sink then
> gets F+1 before F, which means my resulting store has incorrect data
> (possibly forever if F+1 was the final firing).
> >>>>>>>
> >>>>>>> If ordering is not preserved, it seems as if I'd need to introduce
> my own ordering back in after GroupIntoBatches.  GIB is an example here,
> but I imagine this could happen with any GBK type operation.
> >>>>>>>
> >>>>>>> Am I thinking about this the correct way?  Thanks!
> >>>>>>>
> >>>>>>> [1]
> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>
>>

Reply via email to