eason-yuchen-liu opened a new pull request, #56043:
URL: https://github.com/apache/spark/pull/56043

   ### What changes were proposed in this pull request?
   
   When a user configures `withWatermark(col, "0 seconds")` (or any other 
zero-length
   interval), every record whose event time equals the current watermark is 
dropped
   by the late-event filter. This change bumps a configured delay of `0` to `1 
ms`
   internally so that records sharing the latest event time across batches are 
no
   longer dropped, and logs a one-line warning at the call site so the user is
   aware. The bump is gated by a new default-on SQL conf
   `spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`; set it to 
`false`
   to restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.
   
   ### Why are the changes needed?
   
   In microbatch mode the watermark is computed at the end of each batch as
   `max(event_time) - delay` and used to filter late events in the *next* batch
   with the predicate `event_time_us <= watermark_ms * 1000`. With a delay of 
`0`
   this means the predicate is satisfied (and the row is dropped) whenever an
   incoming record's event time equals the previous batch's maximum event time —
   even though that record is no more "late" than the one that produced the
   maximum in the first place. The effect is that two records with identical
   event times are treated differently solely based on which batch they happen 
to
   land in: the record that produced the max is kept, the record that arrives in
   the next batch is dropped.
   
   In other words, with a 0-second watermark, rows that share the exact same
   event time are discriminated against based on batch arrival. This fix removes
   that asymmetry by ensuring the effective delay is at least one millisecond,
   which makes the late-event comparison strict in practice
   (`event_time_us < watermark_ms * 1000`).
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. When `withWatermark(col, "0 seconds")` is used (or any other zero-length
   interval):
   - A warning is logged the first time the delay is resolved.
   - Records whose event time exactly matches the current watermark are no
     longer dropped as late.
   - State for groups/windows ending exactly at the watermark is retained for
     one extra microbatch.
   
   The change is gated by
   `spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`, which defaults
   to `true`. Set it to `false` to recover the prior behavior. Non-zero delays
   are unaffected.
   
   ### How was this patch tested?
   
   Added `EventTimeWatermarkSuite."zero-delay watermark keeps records at max 
event
   time across batches"` which exercises both the new default
   (`bumpZeroDelayToOneMs=true`) and the legacy path
   (`bumpZeroDelayToOneMs=false`).
   
   A handful of existing `MultiStatefulOperatorsSuite` tests encode the legacy
   0-delay boundary in their expected watermark/output and have been wrapped in
   `withSQLConf(... -> "false")` so they continue to validate that path.
   
   `EventTimeWatermarkSuite`, `MultiStatefulOperatorsSuite`,
   `StreamingJoinSuite`, `FlatMapGroupsWithStateSuite`, and
   `StreamingQueryOptimizationCorrectnessSuite` all pass locally.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Anthropic, claude-opus-4-7), with human review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to