liviazhu commented on code in PR #56019:
URL: https://github.com/apache/spark/pull/56019#discussion_r3377204429
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala:
##########
@@ -205,3 +215,69 @@ object CommitMetadataV2 {
def apply(json: String): CommitMetadataV2 =
Serialization.read[CommitMetadataV2](json)
}
+
+/**
+ * Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of
per-sink metadata
+ * keyed by sink name. This enables streaming sink evolution: each batch
records the active sink
+ * along with any historical sinks that were used in earlier batches but are
no longer active.
+ *
+ * @param nextBatchWatermarkMs The watermark of the next batch.
+ * @param stateUniqueIds Per-operator state store unique ids (see
[[CommitMetadataV2]]).
+ * @param sinkMetadataMap Map keyed by sink name. There is at most one active
entry per
+ * commit; deactivated sinks are retained to detect
reuse of a sink name.
+ */
+case class CommitMetadataV3(
+ nextBatchWatermarkMs: Long = 0,
+ stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
+ sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty) extends
CommitMetadataBase {
+ override def version: Int = CommitLog.VERSION_3
+
+ override def withStateUniqueIds(
+ stateUniqueIds: Option[Map[Long, Array[Array[String]]]]):
CommitMetadataV3 =
+ copy(stateUniqueIds = stateUniqueIds)
+
+ /** Returns the currently active sink's metadata, if any. */
+ def activeSinkMetadataInfoOpt: Option[SinkMetadataInfo] =
sinkMetadataMap.values.find(_.isActive)
+}
+
+object CommitMetadataV3 {
+ implicit val format: Formats = Serialization.formats(NoTypeHints)
+
+ def apply(json: String): CommitMetadataV3 =
Serialization.read[CommitMetadataV3](json)
+}
+
+/**
+ * Per-sink metadata recorded in a [[CommitMetadataV3]] entry.
+ *
+ * @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or
+ * `MicroBatchExecution.DEFAULT_SINK_NAME` when sink evolution
is disabled.
Review Comment:
I think this one is not addressed still
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]