cloud-fan opened a new pull request, #56692:
URL: https://github.com/apache/spark/pull/56692

   ### What changes were proposed in this pull request?
   
   Follow-up to #56020 (SPARK-56972), which added V3 commit-log persistence of 
sink metadata inside `MicroBatchExecution.markMicroBatchEnd`. Two issues:
   
   1. **Async progress tracking silently drops sink metadata.** 
`AsyncProgressTrackingMicroBatchExecution` overrides `markMicroBatchEnd` and 
writes only V1 commit metadata through its async path; it never goes through 
the V3 write the parent added. So when both 
`spark.sql.streaming.queryEvolution.enableSinkEvolution` and 
`asyncProgressTrackingEnabled` are on, the sink metadata is silently never 
persisted. This PR rejects the combination explicitly at query start (mirroring 
the existing async validations for `Once`/`AvailableNow` triggers and 
unsupported sinks), so the durability gap fails loudly instead of silently.
   
   2. **`sinkMetadataMap` doc comment was inaccurate.** It claimed insertion 
order is preserved "so that we can re-emit deactivated sinks in the same order 
they originally appeared", but the order is not upheld end-to-end: the 
commit-log write rebuilds the map via `.toMap` and the field round-trips 
through an unordered serialized `Map` on restart. The active sink is found by 
its `isActive` flag, not by position, so order is never consumed. The comment 
also referenced `runBatch` as the mutation site when the mutation actually 
lives in `markMicroBatchEnd`. Fixed the comment and switched the field to a 
plain `mutable.HashMap`.
   
   ### Why are the changes needed?
   
   The first change is a correctness fix: a user enabling sink evolution 
together with async progress tracking would get no error and no persisted sink 
metadata, defeating the feature's durability guarantee with no signal. The 
second keeps the in-code documentation honest so a future maintainer does not 
rely on an ordering guarantee that does not hold.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, but only for the unreleased sink-evolution feature (off by default). A 
streaming query that sets both 
`spark.sql.streaming.queryEvolution.enableSinkEvolution=true` and 
`asyncProgressTrackingEnabled=true` now fails at start with 
`IllegalArgumentException("Async progress tracking cannot be used with 
streaming sink evolution 
(spark.sql.streaming.queryEvolution.enableSinkEvolution)")` instead of running 
while silently not persisting the sink metadata.
   
   ### How was this patch tested?
   
   Added `AsyncProgressTrackingMicroBatchExecutionSuite."Fail with streaming 
sink evolution enabled"`, which asserts the new validation error. Existing 
`AsyncProgressTrackingMicroBatchExecutionSuite` and 
`StreamingSinkEvolutionSuite` (12 tests) pass with the `HashMap` change.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-8)
   
   This pull request and its description were written by Isaac.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to