ericm-db commented on code in PR #56018:
URL: https://github.com/apache/spark/pull/56018#discussion_r3326379599


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala:
##########
@@ -50,39 +51,99 @@ class CommitLog(
     sparkSession: SparkSession,
     path: String,
     readOnly: Boolean = false)
-  extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) {
+  extends HDFSMetadataLog[CommitMetadataBase](sparkSession, path, readOnly) {
 
   import CommitLog._
 
-  private val VERSION: Int = sparkSession.conf.get(
+  // The configured commit log format version. Used as the default version 
when callers
+  // construct metadata through [[createMetadata]].
+  private[sql] val VERSION: Int = sparkSession.conf.get(
     SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt
 
-  override protected[sql] def deserialize(in: InputStream): CommitMetadata = {
-    // called inside a try-finally where the underlying stream is closed in 
the caller
-    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
-    if (!lines.hasNext) {
-      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
-    }
-    // TODO [SPARK-49462] This validation should be relaxed for a stateless 
query.
-    // TODO [SPARK-50653] This validation should be relaxed to support reading
-    //  a V1 log file when VERSION is V2
-    validateVersionExactMatch(lines.next().trim, VERSION)
-    val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
-    CommitMetadata(metadataJson)
+  override protected[sql] def deserialize(in: InputStream): CommitMetadataBase 
= {
+    CommitLog.readCommitMetadata(in)
   }
 
-  override protected[sql] def serialize(metadata: CommitMetadata, out: 
OutputStream): Unit = {
+  override protected[sql] def serialize(metadata: CommitMetadataBase, out: 
OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in 
the caller
-    out.write(s"v${VERSION}".getBytes(UTF_8))
+    out.write(s"v${metadata.version}".getBytes(UTF_8))
     out.write('\n')
 
     // write metadata
     out.write(metadata.json.getBytes(UTF_8))
   }
+
+  /**
+   * Factory for creating a [[CommitMetadataBase]] for the requested wire 
format version.
+   * Defaults to the version configured via 
[[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]].
+   */
+  def createMetadata(
+      nextBatchWatermarkMs: Long = 0,
+      stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
+      commitLogFormatVersion: Int = VERSION): CommitMetadataBase = {
+    commitLogFormatVersion match {
+      case VERSION_2 =>
+        CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds)
+      case VERSION_1 =>
+        CommitMetadata(nextBatchWatermarkMs)

Review Comment:
   Yup we now error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to