Hi,
My Beam pipeline is designed to work with an unbounded source KafkaIO.
It roughly looks like below:
p.apply(KafkaIO.read() ...) // (A-1)
.apply(WithKeys.of(...).withKeyType(...))
.apply(Window.into(FixedWindows.of(...)))
.apply(Combine.perKey(...)) // (B)
.apply(Window.into(new GlobalWindows())) // to have per-key stats in
(C)
.apply(ParDo.of(new MyStatefulDoFn())) // (C)
Note that (C) has its own state which is expected to be fetched and updated
by window results (B) in order of event-time.
Now I'm writing an integration test where (A-1) is replaced by (A-2):
> p.apply(TextIO.read().from("test.txt")) // (A-2)
"text.txt" contains samples having a single key.
I get a wrong result and it turns out that window results didn't feed into
(C) in order.
Is it because (A-2) makes the pipeline a bounded one?
Q1. How to prevent this from happening?
Q2. How do you guys usually write an integration test for an unbounded one
with stateful function?
Best,
Dongwon