dongjoon-hyun commented on code in PR #48460:
URL: https://github.com/apache/spark/pull/48460#discussion_r1917607771


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -305,6 +571,266 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     validateBaseCheckpointInfo()
   }
 
+  /**
+   * Verify lineage for each partition across batches. Below should satisfy 
because
+   * these ids are stored in the following manner:
+   * stateStoreCkptIds: id3, id2, id1
+   * baseStateStoreCkptIds:  id2, id1, None
+   * Below checks [id2, id1] are the same,
+   * which is the lineage for this partition across batches
+   */
+  private def checkpointInfoLineageVerification(
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Unit = {
+
+    Seq(0, 1).foreach {
+      partitionId =>
+        val stateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.stateStoreCkptId)
+        val baseStateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.baseStateStoreCkptId)
+
+        assert(stateStoreCkptIds.drop(1).iterator
+          .sameElements(baseStateStoreCkptIds.dropRight(1)))
+    }
+  }
+
+  /**
+   * Verify the version->UniqueId mapping from state store is the same as
+   * that from the commit log
+   * @param checkpointDir The checkpoint directory to read the commit log
+   * @param pickedCheckpointInfoList The checkpoint info list to verify
+   * @return true when the version->UniqueId mapping is the same as what from 
the commit log
+   */
+  private def verifyCheckpointInfoFromCommitLog(
+      checkpointDir: File,
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Boolean = 
{
+    var ret: Boolean = true
+
+    val versionToUniqueIdFromStateStore = Seq(1, 2).map {
+      batchVersion =>
+        val res = pickedCheckpointInfoList
+          .filter(_.batchVersion == batchVersion).map(_.stateStoreCkptId.get)
+
+        // batch Id is batchVersion - 1
+        batchVersion - 1 -> res.toArray
+    }.toMap
+
+    val commitLogPath = new Path(
+      new Path(checkpointDir.getAbsolutePath), "commits").toString
+
+    val commitLog = new CommitLog(spark, commitLogPath)
+    val metadata = commitLog.get(Some(0), Some(1)).map(_._2)
+
+    val versionToUniqueIdFromCommitLog = metadata.zipWithIndex.map { case 
(metadata, idx) =>
+      // Use stateUniqueIds(0) because there is only one state operator
+      val res2 = metadata.stateUniqueIds(0).map { uniqueIds =>

Review Comment:
   Hi, @WweiL and @HeartSaVioR . 
   
   Although this passes the CIs, it seems that `master` branch fails at this 
line during the compilation in PySpark test pipeline. Could you take a look at 
the failure?
   - https://github.com/apache/spark/actions/runs/12801139497/job/35690169405
   - https://github.com/apache/spark/actions/runs/12801139497/job/35690170205
   - https://github.com/apache/spark/actions/runs/12801139497/job/35690170368
   ```
   [error] 
/__w/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:626:41:
 Option[Map[Long,Array[Array[String]]]] does not take parameters
   [error]       val res2 = metadata.stateUniqueIds(0).map { uniqueIds =>
   [error]                                         ^
   [error] 
/__w/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:634:85:
 value sorted is not a member of Nothing
   [error]         if 
(!versionToUniqueIdFromStateStore(version).sorted.sameElements(uniqueIds.sorted))
 {
   [error]                                                                
   ```
   
   ![Screenshot 2025-01-15 at 19 16 
41](https://github.com/user-attachments/assets/26c46f7a-f867-4611-a91d-4d29c80f454c)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to