Hi Fabian,
you can use (experimental) @RequiresTimeSortedInput [1] annotation for
that. I believe it should be supported by Dataflow in batch mode
(because as you noticed Dataflow sorts inputs to stateful DoFn in batch
by default; maybe someone can correct me if I'm wrong). It is supported
by DirectRunner, so it should stabilize your tests.
I must emphasize, that the annotation changes the way the stateful DoFn
handles late vs. droppable data, as described in the javadoc. This
should make no difference in batch mode (no watermark in batch), but
attention has to be paid to this when running pipeline in streaming
mode. Although I'm not sure if this is supported by Dataflow in
streaming mode (the runner excludes tests for that, so it seems to me,
that it is not).
Jan
[1]
https://beam.apache.org/releases/javadoc/2.33.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
On 10/15/21 11:05, Fabian Loris wrote:
Hi folks,
In my scenario I’m aggregating data in session windows and then
windowing it back into the global window to process each key with a
stateful DoFn. I’m currently doing this in batch, where the order of
the data is important, so the session windows need to be processed in
order. I couldn’t find a standard way of doing this, but my tests show
that the Dataflow runner is correctly processing the data in order,
but the DirectRunner is not. So there seems to be some difference in
the behavior of these two runners. I know Apache Beam makes no
guarantee on order, but it seems the Dataflow runner does?!
Can some provide more details here as I couldn’t find anything? Did I
discover some undocumented behavior of the Dataflow runner? Would
there be some alternative approach as I'm doing it right now (see below)?
Although Dataflow is behaving as I want it, I'm still not sure if I
can/should rely on that. Furthermore, I can't test it in an automated
way as the DirectRunner is behaving differently.
Below you can find some code snippets. I use Scala with Scio, but I
believe it is still readable:
pipelineInput
.timestampBy(m => m.timestamp, allowedTimestampSkew =
Duration.standardMinutes(1))
.withSessionWindows(Duration.minutes(10),..)
.applyKvTransform(WithKeys.of(new SerializableFunction[Model, String] {
private[pipeline] override def apply(input: Model): String = input.key
}))
.applyKvTransform(GroupByKey.create[String, Model]())
.applyKvTransform(Window.remerge[KV[String,
java.lang.Iterable[Model]]]())
.withGlobalWindow() // back to global window
.applyTransform(ParDo.of(statefulDoFn)) // <- here the session windows
should be processed in order
Thanks a lot for your help :)
Fabian