ericm-db opened a new pull request, #56020:
URL: https://github.com/apache/spark/pull/56020

   ### What changes were proposed in this pull request?
   
   Wire the sink name through `MicroBatchExecution` so that, when sink 
evolution is enabled, each committed batch writes a `CommitMetadataV3` whose 
`sinkMetadataMap` records the current sink as the active entry alongside any 
sinks that were active in earlier batches:
   
   - Add a per-execution `sinkMetadataMap` that is hydrated from the latest 
`CommitMetadataV3` in `populateStartOffsets`.
   - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the 
commit-log write in `runBatch` produces `CommitMetadataV3` with every prior 
entry marked `isActive = false` and the current `(sinkName, 
sink.getClass.getName)` entered as `isActive = true`.
   - When sink evolution is disabled, the existing V1/V2 commit-log path is 
preserved unchanged.
   
   This is the minimal write-then-read parity for the sink evolution feature 
added in SPARK-56719. Provider-mismatch and sink-reuse validation are 
intentionally deferred.
   
   This PR is built on top of #56019 (SPARK-56971), which itself sits on #56018 
(SPARK-56970). It currently shows the upstream commits in its diff; that will 
resolve as the predecessors merge.
   
   ### Why are the changes needed?
   
   SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory 
`sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet 
persisted to the checkpoint. Without persistence, restarts cannot observe 
historical sink identity and the feature is not durable.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Behavior change only when `enableSinkEvolution` is true (off by default): 
the commit log directory now contains V3 commit log files instead of V1/V2 
files. Wire format compatibility is preserved when the flag is left off.
   
   ### How was this patch tested?
   
   Added four new tests in `StreamingSinkEvolutionSuite`:
   - V3 commit log records the active sink for a named query.
   - Renaming the sink across a restart retains the previous sink as `isActive 
= false` and marks the new one active.
   - With sink evolution disabled, the commit log remains V1/V2.
   - Enabling sink evolution on a checkpoint that previously used V1/V2 
transparently upgrades to V3 on the next commit.
   
   Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`, 
`MicroBatchExecutionSuite`, and `AsyncProgressTrackingMicroBatchExecutionSuite` 
all pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to