First, thanks for your answers, they were really helpful. I was not aware of the OrderedListState, but it is not very handy for my use case. While playing around with it I discovered the experimental @RequiresTimeSortedInput which Jan then also suggested. I can confirm that this solves my problem perfectly. Indeed, streaming would be another story and based on the current implementation Dataflow doesn't support it in streaming mode [1].
Fabian [1] https://github.com/apache/beam/blob/20108fd780a67771d1e188a23834e4b594c8f955/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L2299 Am Mo., 18. Okt. 2021 um 12:12 Uhr schrieb Jan Lukavský <[email protected]>: > Hi Fabian, > > you can use (experimental) @RequiresTimeSortedInput [1] annotation for > that. I believe it should be supported by Dataflow in batch mode (because > as you noticed Dataflow sorts inputs to stateful DoFn in batch by default; > maybe someone can correct me if I'm wrong). It is supported by > DirectRunner, so it should stabilize your tests. > > I must emphasize, that the annotation changes the way the stateful DoFn > handles late vs. droppable data, as described in the javadoc. This should > make no difference in batch mode (no watermark in batch), but attention has > to be paid to this when running pipeline in streaming mode. Although I'm > not sure if this is supported by Dataflow in streaming mode (the runner > excludes tests for that, so it seems to me, that it is not). > > Jan > > [1] > https://beam.apache.org/releases/javadoc/2.33.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html > On 10/15/21 11:05, Fabian Loris wrote: > > Hi folks, > > In my scenario I’m aggregating data in session windows and then windowing > it back into the global window to process each key with a stateful DoFn. > I’m currently doing this in batch, where the order of the data is > important, so the session windows need to be processed in order. I couldn’t > find a standard way of doing this, but my tests show that the Dataflow > runner is correctly processing the data in order, but the DirectRunner is > not. So there seems to be some difference in the behavior of these two > runners. I know Apache Beam makes no guarantee on order, but it seems the > Dataflow runner does?! > > Can some provide more details here as I couldn’t find anything? Did I > discover some undocumented behavior of the Dataflow runner? Would there be > some alternative approach as I'm doing it right now (see below)? > > Although Dataflow is behaving as I want it, I'm still not sure if I > can/should rely on that. Furthermore, I can't test it in an automated way > as the DirectRunner is behaving differently. > > Below you can find some code snippets. I use Scala with Scio, but I > believe it is still readable: > > pipelineInput > > .timestampBy(m => m.timestamp, allowedTimestampSkew = > Duration.standardMinutes(1)) > .withSessionWindows(Duration.minutes(10),..) > .applyKvTransform(WithKeys.of(new SerializableFunction[Model, String] { > > private[pipeline] override def apply(input: Model): String = > input.key > > })) > > .applyKvTransform(GroupByKey.create[String, Model]()) > > .applyKvTransform(Window.remerge[KV[String, > java.lang.Iterable[Model]]]()) > > .withGlobalWindow() // back to global window > > .applyTransform(ParDo.of(statefulDoFn)) // <- here the session > windows should be processed in order > > Thanks a lot for your help :) > Fabian > >
