liviazhu commented on code in PR #56019:
URL: https://github.com/apache/spark/pull/56019#discussion_r3377204429


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala:
##########
@@ -205,3 +215,69 @@ object CommitMetadataV2 {
 
   def apply(json: String): CommitMetadataV2 = 
Serialization.read[CommitMetadataV2](json)
 }
+
+/**
+ * Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of 
per-sink metadata
+ * keyed by sink name. This enables streaming sink evolution: each batch 
records the active sink
+ * along with any historical sinks that were used in earlier batches but are 
no longer active.
+ *
+ * @param nextBatchWatermarkMs The watermark of the next batch.
+ * @param stateUniqueIds Per-operator state store unique ids (see 
[[CommitMetadataV2]]).
+ * @param sinkMetadataMap Map keyed by sink name. There is at most one active 
entry per
+ *                       commit; deactivated sinks are retained to detect 
reuse of a sink name.
+ */
+case class CommitMetadataV3(
+    nextBatchWatermarkMs: Long = 0,
+    stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
+    sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty) extends 
CommitMetadataBase {
+  override def version: Int = CommitLog.VERSION_3
+
+  override def withStateUniqueIds(
+      stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): 
CommitMetadataV3 =
+    copy(stateUniqueIds = stateUniqueIds)
+
+  /** Returns the currently active sink's metadata, if any. */
+  def activeSinkMetadataInfoOpt: Option[SinkMetadataInfo] = 
sinkMetadataMap.values.find(_.isActive)
+}
+
+object CommitMetadataV3 {
+  implicit val format: Formats = Serialization.formats(NoTypeHints)
+
+  def apply(json: String): CommitMetadataV3 = 
Serialization.read[CommitMetadataV3](json)
+}
+
+/**
+ * Per-sink metadata recorded in a [[CommitMetadataV3]] entry.
+ *
+ * @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or
+ *                 `MicroBatchExecution.DEFAULT_SINK_NAME` when sink evolution 
is disabled.

Review Comment:
   I think this one is not addressed still



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to