First, thanks for your answers, they were really helpful.

I was not aware of the OrderedListState, but it is not very handy for my
use case. While playing around with it I discovered the
experimental @RequiresTimeSortedInput which Jan then also suggested. I can
confirm that this solves my problem perfectly.
Indeed, streaming would be another story and based on the current
implementation Dataflow doesn't support it in streaming mode [1].

Fabian

[1]
https://github.com/apache/beam/blob/20108fd780a67771d1e188a23834e4b594c8f955/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L2299

Am Mo., 18. Okt. 2021 um 12:12 Uhr schrieb Jan Lukavský <[email protected]>:

> Hi Fabian,
>
> you can use (experimental) @RequiresTimeSortedInput [1] annotation for
> that. I believe it should be supported by Dataflow in batch mode (because
> as you noticed Dataflow sorts inputs to stateful DoFn in batch by default;
> maybe someone can correct me if I'm wrong). It is supported by
> DirectRunner, so it should stabilize your tests.
>
> I must emphasize, that the annotation changes the way the stateful DoFn
> handles late vs. droppable data, as described in the javadoc. This should
> make no difference in batch mode (no watermark in batch), but attention has
> to be paid to this when running pipeline in streaming mode. Although I'm
> not sure if this is supported by Dataflow in streaming mode (the runner
> excludes tests for that, so it seems to me, that it is not).
>
>  Jan
>
> [1]
> https://beam.apache.org/releases/javadoc/2.33.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> On 10/15/21 11:05, Fabian Loris wrote:
>
> Hi folks,
>
> In my scenario I’m aggregating data in session windows and then windowing
> it back into the global window to process each key with a stateful DoFn.
> I’m currently doing this in batch, where the order of the data is
> important, so the session windows need to be processed in order. I couldn’t
> find a standard way of doing this, but my tests show that the Dataflow
> runner is correctly processing the data in order, but the DirectRunner is
> not. So there seems to be some difference in the behavior of these two
> runners. I know Apache Beam makes no guarantee on order, but it seems the
> Dataflow runner does?!
>
> Can some provide more details here as I couldn’t find anything? Did I
> discover some undocumented behavior of the Dataflow runner? Would there be
> some alternative approach as I'm doing it right now (see below)?
>
> Although Dataflow is behaving as I want it, I'm still not sure if I
> can/should rely on that. Furthermore, I can't test it in an automated way
> as the DirectRunner is behaving differently.
>
> Below you can find some code snippets. I use Scala with Scio, but I
> believe it is still readable:
>
> pipelineInput
>
> .timestampBy(m => m.timestamp, allowedTimestampSkew =
> Duration.standardMinutes(1))
>           .withSessionWindows(Duration.minutes(10),..)
> .applyKvTransform(WithKeys.of(new SerializableFunction[Model, String] {
>
>        private[pipeline] override def apply(input: Model): String =
> input.key
>
>               }))
>
>        .applyKvTransform(GroupByKey.create[String, Model]())
>
>       .applyKvTransform(Window.remerge[KV[String,
> java.lang.Iterable[Model]]]())
>
>       .withGlobalWindow() // back to global window
>
>       .applyTransform(ParDo.of(statefulDoFn)) // <- here the session
> windows should be processed in order
>
> Thanks a lot for your help :)
> Fabian
>
>

Reply via email to