Hi,

I'm sorry but I got confused about the inner workings of late events
watermark. You're completely right. Thanks for clarifying.

Regards,
Andrzej

czw., 11 sty 2024 o 13:02 Jungtaek Lim <kabhwan.opensou...@gmail.com>
napisał(a):

> Hi,
>
> The time window is closed and evicted as long as "eviction watermark"
> passes the end of the window. Late events watermark only deals with
> discarding late events from "inputs". We did not introduce additional delay
> on the work of multiple stateful operators. We just allowed more late
> events to be accepted.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera <andrzejz...@gmail.com>
> wrote:
>
>> I'm struggling with the following issue in Spark >=3.4, related to
>> multiple stateful operations.
>>
>> When spark.sql.streaming.statefulOperator.allowMultiple is enabled,
>> Spark keeps track of two types of watermarks:
>> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents.
>> Introducing them allowed chaining multiple stateful operations but also
>> introduced an additional delay for getting the output out of the streaming
>> query.
>>
>> I'll show this on the example. Assume we have a stream of click events
>> and we aggregate it first by 1-min window and then by 5-min window. If we
>> have a trigger interval of 30s, then in most cases we'll get output 30s
>> later compared to single stateful operations queries. To find out how,
>> let's look at the following examples:
>>
>> Example 1. Single stateful operation (aggregation by 5-min window, assume
>> watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp
>> at the time of getting data from Kafka
>> Global watermark Output
>> 14:10:00 14:09:56 0 -
>> 14:10:30 14:10:26 14:09:56 -
>> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>>
>> Example 2. Mutliple stateful operations (aggregation by 1-min window
>> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp at the time of
>> getting data from Kafka Late events watermark Eviction watermark Output
>> 14:10:00 14:09:56 0 0 -
>> 14:10:30 14:10:26 0 14:09:56 -
>> 14:11:00 14:10:56 14:09:56 14:10:26 -
>> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>>
>> In Example 2, we need to wait until both watermarks cross the end of the
>> window to get the output for that window, which happens one iteration later
>> compared to Example 1.
>>
>> Now, in use cases that require near-real-time processing, this one
>> iteration delay can be quite a significant difference.
>>
>> Do we have any option to make streaming queries with multiple stateful
>> operations output data without waiting this extra iteration? One of my
>> ideas was to force an empty microbatch to run and propagate late events
>> watermark without any new data. While this conceptually works, I didn't
>> find a way to trigger an empty microbatch while being connected to Kafka
>> that constantly receives new data and while having a constant 30s trigger
>> interval.
>>
>> Thanks,
>> Andrzej
>>
>

Reply via email to