Are you also saying also that even in the first example (Source ->
CombineByKey (Sum) -> Sink) there's no guarantee that events would be
delivered in-order from the Combine -> Sink transforms?  This seems like a
pretty big "got-cha" for correctness if you ever use accumulating
triggering.

I'd also like to point out I'm not talking about a global ordering across
the entire PCollection, I'm talking about within the same key after a GBK
transform.

On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]>
wrote:

> Due to the nature of distributed processing, order is not preserved. You
> can, however, inspect the PaneInfo to determine if an element was early,
> on-time, or late and act accordingly.
>
> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]>
> wrote:
>
>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>>
>> Or uses the Sorter extension for this:
>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>
>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019, 16:31:
>>
>>> Hi everyone, I have some questions I want to ask about how windowing,
>>> triggering, and panes work together, and how to ensure correctness
>>> throughout a pipeline.
>>>
>>> Lets assume I have a very simple streaming pipeline that looks like:
>>> Source -> CombineByKey (Sum) -> Sink
>>>
>>> Given fixed windows of 1 hour, early firings every minute, and
>>> accumulating panes, this is pretty straight forward.  However, this can get
>>> more complicated if we add steps after the CombineByKey, for instance
>>> (using the same windowing strategy):
>>>
>>> Say I want to buffer the results of the CombineByKey into batches of N
>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>> now my pipeline looks like:
>>>
>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>
>>> *This leads to my main question:*
>>> Is ordering preserved somehow here?  ie: is it possible that the result
>>> from early firing F+1 now comes BEFORE the firing F (because it was
>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>> gets F+1 before F, which means my resulting store has incorrect data
>>> (possibly forever if F+1 was the final firing).
>>>
>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>> imagine this could happen with any GBK type operation.
>>>
>>> Am I thinking about this the correct way?  Thanks!
>>>
>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>
>>

Reply via email to