Hi Fabian,

you can use (experimental) @RequiresTimeSortedInput [1] annotation for that. I believe it should be supported by Dataflow in batch mode (because as you noticed Dataflow sorts inputs to stateful DoFn in batch by default; maybe someone can correct me if I'm wrong). It is supported by DirectRunner, so it should stabilize your tests.

I must emphasize, that the annotation changes the way the stateful DoFn handles late vs. droppable data, as described in the javadoc. This should make no difference in batch mode (no watermark in batch), but attention has to be paid to this when running pipeline in streaming mode. Although I'm not sure if this is supported by Dataflow in streaming mode (the runner excludes tests for that, so it seems to me, that it is not).

 Jan

[1] https://beam.apache.org/releases/javadoc/2.33.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

On 10/15/21 11:05, Fabian Loris wrote:

Hi folks,

In my scenario I’m aggregating data in session windows and then windowing it back into the global window to process each key with a stateful DoFn. I’m currently doing this in batch, where the order of the data is important, so the session windows need to be processed in order. I couldn’t find a standard way of doing this, but my tests show that the Dataflow runner is correctly processing the data in order, but the DirectRunner is not. So there seems to be some difference in the behavior of these two runners. I know Apache Beam makes no guarantee on order, but it seems the Dataflow runner does?!

Can some provide more details here as I couldn’t find anything? Did I discover some undocumented behavior of the Dataflow runner? Would there be some alternative approach as I'm doing it right now (see below)?

Although Dataflow is behaving as I want it, I'm still not sure if I can/should rely on that. Furthermore, I can't test it in an automated way as the DirectRunner is behaving differently.

Below you can find some code snippets. I use Scala with Scio, but I believe it is still readable:

pipelineInput

.timestampBy(m => m.timestamp, allowedTimestampSkew = Duration.standardMinutes(1))           .withSessionWindows(Duration.minutes(10),..)
.applyKvTransform(WithKeys.of(new SerializableFunction[Model, String] {

private[pipeline] override def apply(input: Model): String = input.key

         }))

.applyKvTransform(GroupByKey.create[String, Model]())

   .applyKvTransform(Window.remerge[KV[String, java.lang.Iterable[Model]]]())

  .withGlobalWindow() // back to global window

.applyTransform(ParDo.of(statefulDoFn)) // <- here the session windows should be processed in order

Thanks a lot for your help :)
Fabian

Reply via email to