eason-yuchen-liu opened a new pull request, #56043:
URL: https://github.com/apache/spark/pull/56043
### What changes were proposed in this pull request?
When a user configures `withWatermark(col, "0 seconds")` (or any other
zero-length
interval), every record whose event time equals the current watermark is
dropped
by the late-event filter. This change bumps a configured delay of `0` to `1
ms`
internally so that records sharing the latest event time across batches are
no
longer dropped, and logs a one-line warning at the call site so the user is
aware. The bump is gated by a new default-on SQL conf
`spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`; set it to
`false`
to restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.
### Why are the changes needed?
In microbatch mode the watermark is computed at the end of each batch as
`max(event_time) - delay` and used to filter late events in the *next* batch
with the predicate `event_time_us <= watermark_ms * 1000`. With a delay of
`0`
this means the predicate is satisfied (and the row is dropped) whenever an
incoming record's event time equals the previous batch's maximum event time —
even though that record is no more "late" than the one that produced the
maximum in the first place. The effect is that two records with identical
event times are treated differently solely based on which batch they happen
to
land in: the record that produced the max is kept, the record that arrives in
the next batch is dropped.
In other words, with a 0-second watermark, rows that share the exact same
event time are discriminated against based on batch arrival. This fix removes
that asymmetry by ensuring the effective delay is at least one millisecond,
which makes the late-event comparison strict in practice
(`event_time_us < watermark_ms * 1000`).
### Does this PR introduce _any_ user-facing change?
Yes. When `withWatermark(col, "0 seconds")` is used (or any other zero-length
interval):
- A warning is logged the first time the delay is resolved.
- Records whose event time exactly matches the current watermark are no
longer dropped as late.
- State for groups/windows ending exactly at the watermark is retained for
one extra microbatch.
The change is gated by
`spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`, which defaults
to `true`. Set it to `false` to recover the prior behavior. Non-zero delays
are unaffected.
### How was this patch tested?
Added `EventTimeWatermarkSuite."zero-delay watermark keeps records at max
event
time across batches"` which exercises both the new default
(`bumpZeroDelayToOneMs=true`) and the legacy path
(`bumpZeroDelayToOneMs=false`).
A handful of existing `MultiStatefulOperatorsSuite` tests encode the legacy
0-delay boundary in their expected watermark/output and have been wrapped in
`withSQLConf(... -> "false")` so they continue to validate that path.
`EventTimeWatermarkSuite`, `MultiStatefulOperatorsSuite`,
`StreamingJoinSuite`, `FlatMapGroupsWithStateSuite`, and
`StreamingQueryOptimizationCorrectnessSuite` all pass locally.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic, claude-opus-4-7), with human review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]