ericm-db opened a new pull request, #56548:
URL: https://github.com/apache/spark/pull/56548

   ### What changes were proposed in this pull request?
   
   Backport of [SPARK-56971] 
([apache/spark#56019](https://github.com/apache/spark/pull/56019)) to 
`branch-4.2`.
   
   Add the commit log data structures for streaming sink evolution:
   
   - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a 
`sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in 
addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`).
   - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` 
(serialized via `OffsetV2.json()`), `providerName`, `apiVersion`, and an 
`isActive` flag used to distinguish the current sink from historical sinks that 
were used in earlier batches but are no longer in use.
   - `CommitMetadataV3.activeSinkMetadataInfo` returns the entry with `isActive 
= true`; `CommitMetadataV3` requires exactly one active sink.
   - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when 
`commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`.
   - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class.
   
   The V3 metadata is dormant in this PR: no caller produces it yet. Wiring 
through `MicroBatchExecution` is the SPARK-56972 follow-up.
   
   **Prerequisite commit.** SPARK-56971 was built on top of [SPARK-56970] 
([apache/spark#56018](https://github.com/apache/spark/pull/56018)), which 
splits `CommitMetadata` into a `CommitMetadataBase` trait with concrete 
`CommitMetadata` (V1) and `CommitMetadataV2` case classes. `branch-4.2` does 
not yet have SPARK-56970, so this PR includes it as the first commit and adds 
SPARK-56971 on top. Both commits are cherry-picked from the `branch-4.x` 
backports (`5322ec30c02` and `706ce2f3743`). The only conflicts were 
import-line collisions in `CommitLogSuite.scala` (the suite extends 
`SparkFunSuite with SharedSparkSession` on `branch-4.2`); the resolved 
`CommitLog.scala` is identical to `branch-4.x`.
   
   ### Why are the changes needed?
   
   SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink 
evolution. Without a place in the commit log to durably record the sink name 
and offset alongside the rest of a committed batch's metadata, sink names 
cannot be observed on restart and the evolution feature cannot be completed. 
This PR introduces that storage in the 4.2 release line.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. `CommitMetadataV3` is in the internal 
`org.apache.spark.sql.execution.streaming.checkpointing` package and is not 
produced by any code path yet. As part of the SPARK-56970 refactor, V1 commit 
log files no longer serialize `stateUniqueIds: null`; old V1 files continue to 
be read because the V1 deserializer ignores the (now-unknown) field.
   
   ### How was this patch tested?
   
   - Cherry-picked the two `branch-4.x` commits; resolved import conflicts in 
`CommitLogSuite.scala`.
   - Existing and new `CommitLogSuite` cases (V1/V2/V3 SerDe, historical-sink 
retention, `createMetadata` V3 empty-map failure, exactly-one-active-sink 
invariant).
   - `sql/core` main and test sources compile cleanly on `branch-4.2` 
(`build/sbt sql/Test/compile`).
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to