I'm struggling with the following issue in Spark >=3.4, related to multiple stateful operations.
When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark keeps track of two types of watermarks: eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents. Introducing them allowed chaining multiple stateful operations but also introduced an additional delay for getting the output out of the streaming query. I'll show this on the example. Assume we have a stream of click events and we aggregate it first by 1-min window and then by 5-min window. If we have a trigger interval of 30s, then in most cases we'll get output 30s later compared to single stateful operations queries. To find out how, let's look at the following examples: Example 1. Single stateful operation (aggregation by 5-min window, assume watermark is 0 seconds) Wall clock (microbatch processing starts) Max event timestamp at the time of getting data from Kafka Global watermark Output 14:10:00 14:09:56 0 - 14:10:30 14:10:26 14:09:56 - 14:11:00 14:10:56 14:10:26 window <14:05, 14:10) Example 2. Mutliple stateful operations (aggregation by 1-min window followed by aggregation by 5-min window, assume watermark is 0 seconds) Wall clock (microbatch processing starts) Max event timestamp at the time of getting data from Kafka Late events watermark Eviction watermark Output 14:10:00 14:09:56 0 0 - 14:10:30 14:10:26 0 14:09:56 - 14:11:00 14:10:56 14:09:56 14:10:26 - 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10) In Example 2, we need to wait until both watermarks cross the end of the window to get the output for that window, which happens one iteration later compared to Example 1. Now, in use cases that require near-real-time processing, this one iteration delay can be quite a significant difference. Do we have any option to make streaming queries with multiple stateful operations output data without waiting this extra iteration? One of my ideas was to force an empty microbatch to run and propagate late events watermark without any new data. While this conceptually works, I didn't find a way to trigger an empty microbatch while being connected to Kafka that constantly receives new data and while having a constant 30s trigger interval. Thanks, Andrzej