wow, thats super unexpected and dangerous, thanks for clarifying!  Time to
go re-think how we do some of our writes w/ early firings then.

Are there any workarounds to make things happen in-order in dataflow?  eg
if the sink gets fused to the output of the GBK operation, will it always
happen effectively in order (per key) even though it's not a guarantee?  I
also guess I could keep track of the last pane index my sink has seen, and
ignore earlier ones (using state to keep track)?


On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <[email protected]> wrote:

> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations *may* happen on a different machine. Multiply triggered
> windows add an element of non-determinism to the process.
>
> You're also correct that triggering with multiple panes requires lots of
> care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
>
>
> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> wrote:
>
>> Also to clarify here (I re-read this and realized it could be slightly
>> unclear).  My question is only about in-order delivery of panes.  ie: will
>> pane P always be delivered before P+1.
>>
>> I realize the use of "in-order" before could be confusing, I don't care
>> about the ordering of the elements per-se, just the ordering of the pane
>> delivery.
>>
>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
>> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>
>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]>
>> wrote:
>>
>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>>
>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>>
>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> Due to the nature of distributed processing, order is not preserved.
>>>> You can, however, inspect the PaneInfo to determine if an element was
>>>> early, on-time, or late and act accordingly.
>>>>
>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]>
>>>> wrote:
>>>>
>>>>> In my experience ordering is not guaranteed, you may need apply a
>>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>>
>>>>> Or uses the Sorter extension for this:
>>>>>
>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>
>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019,
>>>>> 16:31:
>>>>>
>>>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>>>> triggering, and panes work together, and how to ensure correctness
>>>>>> throughout a pipeline.
>>>>>>
>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>
>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>>> accumulating panes, this is pretty straight forward.  However, this can 
>>>>>> get
>>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>>
>>>>>> Say I want to buffer the results of the CombineByKey into batches of
>>>>>> N elements.  I can do this with the built-in GroupIntoBatches [1]
>>>>>> transform, now my pipeline looks like:
>>>>>>
>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>
>>>>>> *This leads to my main question:*
>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>>>>> result from early firing F+1 now comes BEFORE the firing F (because it 
>>>>>> was
>>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>>
>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, 
>>>>>> but I
>>>>>> imagine this could happen with any GBK type operation.
>>>>>>
>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>

Reply via email to