Splitting is part of the issue. Other example issues are: * "sources" that input data into the pipeline have no requirement to produce records in a time ordered manner. * timers can hold the output watermark and produce records out of order with time.
All of this time ordering has a cost to performance and throughput so being explicit that something needs time ordered input is useful. On Mon, Aug 24, 2020 at 9:07 PM Dongwon Kim <[email protected]> wrote: > Thanks Reuven for the input and Wang for CC'ing to Reuven. > > Generally you should not rely on PCollection being ordered > > Is it because Beam splits PCollection into multiple input splits and tries > to process it as efficiently as possible without considering times? > This one is very confusing as I've been using Flink for a long time; > AFAIK, Flink DataStream API guarantees ordering for the same key between > two different tasks. > > Best, > > Dongwon > > On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax <[email protected]> wrote: > >> Generally you should not rely on PCollection being ordered, though there >> have been discussions about adding some time-ordering semantics. >> >> >> >> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang <[email protected]> wrote: >> >>> Current Beam model does not guarantee an ordering after a GBK (i.e. >>> Combine.perKey() in your). So you cannot expect that the C step sees >>> elements in a specific order. >>> >>> As I recall on Dataflow runner, there is very limited ordering support. >>> Hi +Reuven Lax <[email protected]> can share your insights about it? >>> >>> >>> -Rui >>> >>> >>> >>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <[email protected]> >>> wrote: >>> >>>> Hi, >>>> >>>> My Beam pipeline is designed to work with an unbounded source KafkaIO. >>>> It roughly looks like below: >>>> p.apply(KafkaIO.read() ...) // (A-1) >>>> .apply(WithKeys.of(...).withKeyType(...)) >>>> .apply(Window.into(FixedWindows.of(...))) >>>> .apply(Combine.perKey(...)) // (B) >>>> .apply(Window.into(new GlobalWindows())) // to have per-key stats >>>> in (C) >>>> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >>>> Note that (C) has its own state which is expected to be fetched and >>>> updated by window results (B) in order of event-time. >>>> >>>> Now I'm writing an integration test where (A-1) is replaced by (A-2): >>>> >>>>> p.apply(TextIO.read().from("test.txt")) // (A-2) >>>> >>>> "text.txt" contains samples having a single key. >>>> >>>> I get a wrong result and it turns out that window results didn't feed >>>> into (C) in order. >>>> Is it because (A-2) makes the pipeline a bounded one? >>>> >>>> Q1. How to prevent this from happening? >>>> Q2. How do you guys usually write an integration test for an unbounded >>>> one with stateful function? >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>
