Splitting is part of the issue.

Other example issues are:
* "sources" that input data into the pipeline have no requirement to
produce records in a time ordered manner.
* timers can hold the output watermark and produce records out of order
with time.

All of this time ordering has a cost to performance and throughput so being
explicit that something needs time ordered input is useful.

On Mon, Aug 24, 2020 at 9:07 PM Dongwon Kim <[email protected]> wrote:

> Thanks Reuven for the input and Wang for CC'ing to Reuven.
>
> Generally you should not rely on PCollection being ordered
>
> Is it because Beam splits PCollection into multiple input splits and tries
> to process it as efficiently as possible without considering times?
> This one is very confusing as I've been using Flink for a long time;
> AFAIK, Flink DataStream API guarantees ordering for the same key between
> two different tasks.
>
> Best,
>
> Dongwon
>
> On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax <[email protected]> wrote:
>
>> Generally you should not rely on PCollection being ordered, though there
>> have been discussions about adding some time-ordering semantics.
>>
>>
>>
>> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang <[email protected]> wrote:
>>
>>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>>> Combine.perKey() in your). So you cannot expect that the C step sees
>>> elements in a specific order.
>>>
>>> As I recall on Dataflow runner, there is very limited ordering support.
>>> Hi +Reuven Lax <[email protected]> can share your insights about it?
>>>
>>>
>>> -Rui
>>>
>>>
>>>
>>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>>> It roughly looks like below:
>>>> p.apply(KafkaIO.read() ...)                                   // (A-1)
>>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>   .apply(Combine.perKey(...))                              // (B)
>>>>   .apply(Window.into(new GlobalWindows()))     // to have per-key stats
>>>> in (C)
>>>>   .apply(ParDo.of(new MyStatefulDoFn()))          // (C)
>>>> Note that (C) has its own state which is expected to be fetched and
>>>> updated by window results (B) in order of event-time.
>>>>
>>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>>>
>>>>> p.apply(TextIO.read().from("test.txt"))                  // (A-2)
>>>>
>>>> "text.txt" contains samples having a single key.
>>>>
>>>> I get a wrong result and it turns out that window results didn't feed
>>>> into (C) in order.
>>>> Is it because (A-2) makes the pipeline a bounded one?
>>>>
>>>> Q1. How to prevent this from happening?
>>>> Q2. How do you guys usually write an integration test for an unbounded
>>>> one with stateful function?
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>

Reply via email to