WweiL commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1836861509


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -249,24 +273,130 @@ class RocksDB(
     }
   }
 
+/**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest 
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the 
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file, 
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId) 
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct 
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+  private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+      currLineage: Array[(Long, Option[String])],
+      latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+      (Long, Option[String]) = {
+    currLineage.foreach {
+      case (version, uniqueId) =>
+        if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+          return (version, uniqueId)
+        }
+    }
+    throw 
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+      printLineage(currLineage), 
printLineage(latestSnapshotVersionsAndUniqueIds)
+    )
+  }
+
+  private def getLineageFromChangelogFile(
+      version: Long,
+      useColumnFamilies: Boolean,
+      stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]] 
= {
+
+    // It is possible that change log checkpointing is first enabled and then 
disabled.
+    // In this case, loading changelog reader will fail because there are only 
zip files.
+    // It is also possible that state store was previously ran under format 
version 1
+    // In that case, loading changelog reader with file format 
version_uniqueId.changelog
+    // will also fail.
+    // But either way, there is no lineage in either case so we can swallow 
the failure
+    // CANNOT_READ_STREAMING_STATE_FILE.
+    var changelogReader: StateStoreChangelogReader = null
+    var currLineage: Option[Array[(Long, Option[String])]] = None
+    try {
+      changelogReader = fileManager.getChangelogReader(
+        version, useColumnFamilies, stateStoreCkptId)
+      // currLineage contains the version -> uniqueId mapping from the 
previous snapshot file
+      // to current version's changelog file
+      versionToUniqueIdLineage = changelogReader.lineage.map {
+        case (version, uniqueId) => (version, Option(uniqueId))
+      }
+      logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+        printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+        LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+        stateStoreCkptId.getOrElse(""))}. " +
+        log"This would be an noop if changelog is not enabled, or the query 
was previously" +
+        log"ran under checkpoint format v1")
+      currLineage = Some(versionToUniqueIdLineage)
+    } catch {
+      // This can happen when you first load with changelog enabled and then 
disable it,
+      // or the state store was previously ran under format version 1.
+      case e: SparkException
+        if e.getErrorClass == 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE" =>
+      // do nothing
+      case e: Throwable =>
+        throw e
+    } finally {
+      if (changelogReader != null) changelogReader.closeIfNeeded()
+      if (currLineage.isEmpty) currLineage = Some(Array((version, 
stateStoreCkptId)))
+    }
+    currLineage
+  }
+
+
   /**
    * Load the given version of data in a native RocksDB instance.
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(version: Long, readOnly: Boolean = false): RocksDB = {
+  def load(
+      version: Long,
+      stateStoreCkptId: Option[String] = None,
+      readOnly: Boolean = false): RocksDB = {
     assert(version >= 0)
     acquire(LoadStore)
     recordedMetrics = None
-    logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
+    logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with 
stateStoreCkptId: ${
+      MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+    var currLineage: Option[Array[(Long, Option[String])]] = None
     try {
-      if (loadedVersion != version) {
+      if (loadedVersion != version ||
+        (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined &&
+          (loadedStateStoreCkptId.isEmpty || stateStoreCkptId.get != 
loadedStateStoreCkptId.get))) {
         closeDB(ignoreException = false)
         // deep copy is needed to avoid race condition
         // between maintenance and task threads
         fileManager.copyFileMapping()
-        val latestSnapshotVersion = 
fileManager.getLatestSnapshotVersion(version)
-        val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+
+        val latestSnapshotVersionsAndUniqueIds =
+          fileManager.getLatestSnapshotVersionAndUniqueId(version, 
stateStoreCkptId)

Review Comment:
   Because  4_uuid4.changelog doesn't know which version is converted to a zip 
file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to