dylanwong250 commented on code in PR #53311:
URL: https://github.com/apache/spark/pull/53311#discussion_r2590418087


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala:
##########
@@ -237,4 +237,147 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     assert(metadata.batchWatermarkMs === 0)
     assert(metadata.batchTimestampMs === 1758651405232L)
   }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with 
VERSION_2") {
+    import testImplicits._
+    withTempDir { checkpointDir =>
+      withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+        val inputData = MemoryStream[Int]
+        val query = inputData.toDF()
+          .writeStream
+          .format("memory")
+          .queryName("offsetlog_v2_test")
+          .option("checkpointLocation", checkpointDir.getAbsolutePath)
+          .start()
+
+        try {
+          // Add data and process batches
+          inputData.addData(1, 2, 3)
+          query.processAllAvailable()
+          inputData.addData(4, 5)
+          query.processAllAvailable()
+
+          // Read the offset log from checkpoint
+          val offsetLog = new OffsetSeqLog(spark, 
s"${checkpointDir.getAbsolutePath}/offsets")
+          val latestBatch = offsetLog.getLatest()
+          assert(latestBatch.isDefined, "Offset log should have at least one 
entry")
+
+          val (batchId, offsetSeq) = latestBatch.get
+
+          // Verify it's OffsetMap (VERSION_2)
+          assert(offsetSeq.isInstanceOf[OffsetMap],
+            s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
+
+          // Verify metadata version is 2
+          assert(offsetSeq.metadataOpt.isDefined)
+          val metadata = offsetSeq.metadataOpt.get
+          assert(metadata.version === 2, s"Expected version 2 but got 
${metadata.version}")
+
+          // Verify OffsetMap uses string keys ("0", "1", etc.)
+          val offsetMap = offsetSeq.asInstanceOf[OffsetMap]
+          assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)),
+            "OffsetMap keys should be string representations of ordinals 
(e.g., '0', '1')")
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") {
+    import testImplicits._
+    withTempDir { checkpointDir =>
+      // Don't set the config, should default to VERSION_1
+      val inputData = MemoryStream[Int]
+      val query = inputData.toDF()
+        .writeStream
+        .format("memory")
+        .queryName("offsetlog_v1_test")
+        .option("checkpointLocation", checkpointDir.getAbsolutePath)
+        .start()
+
+      try {
+        // Add data and process batches
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+
+        // Read the offset log from checkpoint
+        val offsetLog = new OffsetSeqLog(spark, 
s"${checkpointDir.getAbsolutePath}/offsets")
+        val latestBatch = offsetLog.getLatest()
+        assert(latestBatch.isDefined, "Offset log should have at least one 
entry")
+
+        val (batchId, offsetSeq) = latestBatch.get
+
+        // Verify it's OffsetSeq (VERSION_1)
+        assert(offsetSeq.isInstanceOf[OffsetSeq],
+          s"Expected OffsetSeq but got ${offsetSeq.getClass.getSimpleName}")
+
+        // Verify metadata version is 1
+        assert(offsetSeq.metadataOpt.isDefined)
+        val metadata = offsetSeq.metadataOpt.get
+        assert(metadata.version === 1, s"Expected version 1 but got 
${metadata.version}")
+      } finally {
+        query.stop()
+      }
+    }
+  }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - checkpoint wins on 
restart") {

Review Comment:
   should we also test the inverse? Start with v2 and ensure it remains v2 ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala:
##########
@@ -237,4 +237,147 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     assert(metadata.batchWatermarkMs === 0)
     assert(metadata.batchTimestampMs === 1758651405232L)
   }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with 
VERSION_2") {
+    import testImplicits._
+    withTempDir { checkpointDir =>
+      withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+        val inputData = MemoryStream[Int]
+        val query = inputData.toDF()
+          .writeStream
+          .format("memory")
+          .queryName("offsetlog_v2_test")
+          .option("checkpointLocation", checkpointDir.getAbsolutePath)
+          .start()
+
+        try {
+          // Add data and process batches
+          inputData.addData(1, 2, 3)
+          query.processAllAvailable()
+          inputData.addData(4, 5)
+          query.processAllAvailable()
+
+          // Read the offset log from checkpoint
+          val offsetLog = new OffsetSeqLog(spark, 
s"${checkpointDir.getAbsolutePath}/offsets")
+          val latestBatch = offsetLog.getLatest()
+          assert(latestBatch.isDefined, "Offset log should have at least one 
entry")
+
+          val (batchId, offsetSeq) = latestBatch.get
+
+          // Verify it's OffsetMap (VERSION_2)
+          assert(offsetSeq.isInstanceOf[OffsetMap],
+            s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
+
+          // Verify metadata version is 2
+          assert(offsetSeq.metadataOpt.isDefined)
+          val metadata = offsetSeq.metadataOpt.get
+          assert(metadata.version === 2, s"Expected version 2 but got 
${metadata.version}")
+
+          // Verify OffsetMap uses string keys ("0", "1", etc.)
+          val offsetMap = offsetSeq.asInstanceOf[OffsetMap]
+          assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)),

Review Comment:
   Is it worth checking the specific defaults are correct?
   
   I guess in this case it would be just "0"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala:
##########
@@ -244,7 +244,8 @@ object OffsetSeqMetadata extends Logging {
       case (confInSession, confInOffsetLog) =>
         confInOffsetLog.key -> sessionConf.get(confInSession.key)
     }.toMap
-    OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ 
confsFromRebind)
+    val version = 
sessionConf.get(STREAMING_OFFSET_LOG_FORMAT_VERSION.key).toInt

Review Comment:
   Does the version here refer to the OffsetSeqMetadata version or the 
OffsetSeqBase version?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala:
##########
@@ -237,4 +237,147 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     assert(metadata.batchWatermarkMs === 0)
     assert(metadata.batchTimestampMs === 1758651405232L)
   }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with 
VERSION_2") {

Review Comment:
   I think it would be good to add a test with multiple sources and verify the 
OffsetMap



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala:
##########
@@ -237,4 +237,147 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     assert(metadata.batchWatermarkMs === 0)
     assert(metadata.batchTimestampMs === 1758651405232L)
   }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - new query with 
VERSION_2") {
+    import testImplicits._
+    withTempDir { checkpointDir =>
+      withSQLConf(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
+        val inputData = MemoryStream[Int]
+        val query = inputData.toDF()
+          .writeStream
+          .format("memory")
+          .queryName("offsetlog_v2_test")
+          .option("checkpointLocation", checkpointDir.getAbsolutePath)
+          .start()
+
+        try {
+          // Add data and process batches
+          inputData.addData(1, 2, 3)
+          query.processAllAvailable()
+          inputData.addData(4, 5)
+          query.processAllAvailable()
+
+          // Read the offset log from checkpoint
+          val offsetLog = new OffsetSeqLog(spark, 
s"${checkpointDir.getAbsolutePath}/offsets")
+          val latestBatch = offsetLog.getLatest()
+          assert(latestBatch.isDefined, "Offset log should have at least one 
entry")
+
+          val (batchId, offsetSeq) = latestBatch.get
+
+          // Verify it's OffsetMap (VERSION_2)
+          assert(offsetSeq.isInstanceOf[OffsetMap],
+            s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
+
+          // Verify metadata version is 2
+          assert(offsetSeq.metadataOpt.isDefined)
+          val metadata = offsetSeq.metadataOpt.get
+          assert(metadata.version === 2, s"Expected version 2 but got 
${metadata.version}")
+
+          // Verify OffsetMap uses string keys ("0", "1", etc.)
+          val offsetMap = offsetSeq.asInstanceOf[OffsetMap]
+          assert(offsetMap.offsetsMap.keys.forall(_.forall(_.isDigit)),
+            "OffsetMap keys should be string representations of ordinals 
(e.g., '0', '1')")
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
+
+  test("STREAMING_OFFSET_LOG_FORMAT_VERSION config - default VERSION_1") {
+    import testImplicits._

Review Comment:
   could the `import testImplicits._` be at the test class level so it does not 
need to be duplicated for the tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to