I'm struggling with the following issue in Spark >=3.4, related to multiple
stateful operations.

When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
keeps track of two types of watermarks: eventTimeWatermarkForEviction and
eventTimeWatermarkForLateEvents. Introducing them allowed chaining multiple
stateful operations but also introduced an additional delay for getting the
output out of the streaming query.

I'll show this on the example. Assume we have a stream of click events and
we aggregate it first by 1-min window and then by 5-min window. If we have
a trigger interval of 30s, then in most cases we'll get output 30s later
compared to single stateful operations queries. To find out how, let's look
at the following examples:

Example 1. Single stateful operation (aggregation by 5-min window, assume
watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp
at the time of getting data from Kafka
Global watermark Output
14:10:00 14:09:56 0 -
14:10:30 14:10:26 14:09:56 -
14:11:00 14:10:56 14:10:26 window <14:05, 14:10)

Example 2. Mutliple stateful operations (aggregation by 1-min window
followed by aggregation by 5-min window, assume watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp at the time of getting
data from Kafka Late events watermark Eviction watermark Output
14:10:00 14:09:56 0 0 -
14:10:30 14:10:26 0 14:09:56 -
14:11:00 14:10:56 14:09:56 14:10:26 -
14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)

In Example 2, we need to wait until both watermarks cross the end of the
window to get the output for that window, which happens one iteration later
compared to Example 1.

Now, in use cases that require near-real-time processing, this one
iteration delay can be quite a significant difference.

Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.

Thanks,
Andrzej

Reply via email to