zifeif2 commented on code in PR #53720:
URL: https://github.com/apache/spark/pull/53720#discussion_r2670551584


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -443,77 +443,82 @@ class RocksDB(
   private def loadWithCheckpointId(
       version: Long,
       stateStoreCkptId: Option[String],
-      readOnly: Boolean = false): RocksDB = {
+      readOnly: Boolean = false,
+      loadEmpty: Boolean = false): RocksDB = {
     // An array contains lineage information from [snapShotVersion, version]
     // (inclusive in both ends)
     var currVersionLineage: Array[LineageItem] = 
lineageManager.getLineageForCurrVersion()
     try {
-      if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
-          stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+      if (loadEmpty || loadedVersion != version || 
loadedStateStoreCkptId.isEmpty ||
+        stateStoreCkptId.get != loadedStateStoreCkptId.get) {
         closeDB(ignoreException = false)
-
-        val (latestSnapshotVersion, latestSnapshotUniqueId) = {
-          // Special handling when version is 0.
-          // When loading the very first version (0), stateStoreCkptId does 
not need to be defined
-          // because there won't be 0.changelog / 0.zip file created in 
RocksDB under v2.
-          if (version == 0) {
-            assert(stateStoreCkptId.isEmpty,
-              "stateStoreCkptId should be empty when version is zero")
-            (0L, None)
-          // When there is a snapshot file, it is the ground truth, we can skip
-          // reconstructing the lineage from changelog file.
-          } else if (fileManager.existsSnapshotFile(version, 
stateStoreCkptId)) {
-            currVersionLineage = Array(LineageItem(version, 
stateStoreCkptId.get))
-            (version, stateStoreCkptId)
-          } else {
-            currVersionLineage = getLineageFromChangelogFile(version, 
stateStoreCkptId) :+
-              LineageItem(version, stateStoreCkptId.get)
-            currVersionLineage = currVersionLineage.sortBy(_.version)
-
-            val latestSnapshotVersionsAndUniqueId =
-              
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
-            latestSnapshotVersionsAndUniqueId match {
-              case Some(pair) => (pair._1, Option(pair._2))
-              case None if currVersionLineage.head.version == 1L =>
-                logDebug(log"Cannot find latest snapshot based on lineage but 
first version " +
-                  log"is 1, use 0 as default. Lineage: ${MDC(LogKeys.LINEAGE, 
lineageManager)}")
-                (0L, None)
-              case _ =>
-                throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
-                  printLineageItems(currVersionLineage))
+        if (loadEmpty) {
+          loadEmptyStore(version)
+          lineageManager.clear()
+        } else {
+          val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+            // Special handling when version is 0.
+            // When loading the very first version (0), stateStoreCkptId does 
not need to be defined
+            // because there won't be 0.changelog / 0.zip file created in 
RocksDB under v2.
+            if (version == 0) {
+              assert(stateStoreCkptId.isEmpty,
+                "stateStoreCkptId should be empty when version is zero")
+              (0L, None)
+              // When there is a snapshot file, it is the ground truth, we can 
skip
+              // reconstructing the lineage from changelog file.
+            } else if (fileManager.existsSnapshotFile(version, 
stateStoreCkptId)) {
+              currVersionLineage = Array(LineageItem(version, 
stateStoreCkptId.get))
+              (version, stateStoreCkptId)
+            } else {
+              currVersionLineage = getLineageFromChangelogFile(version, 
stateStoreCkptId) :+
+                LineageItem(version, stateStoreCkptId.get)
+              currVersionLineage = currVersionLineage.sortBy(_.version)
+
+              val latestSnapshotVersionsAndUniqueId =
+                
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+              latestSnapshotVersionsAndUniqueId match {
+                case Some(pair) => (pair._1, Option(pair._2))
+                case None if currVersionLineage.head.version == 1L =>
+                  logDebug(log"Cannot find latest snapshot based on lineage 
but first version " +
+                    log"is 1, use 0 as default. Lineage: 
${MDC(LogKeys.LINEAGE, lineageManager)}")
+                  (0L, None)
+                case _ =>
+                  throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
+                    printLineageItems(currVersionLineage))
+              }
             }
           }
-        }
 
-        logInfo(log"Loaded latestSnapshotVersion: ${
-          MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, 
latestSnapshotUniqueId: ${
-          MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
+          logInfo(log"Loaded latestSnapshotVersion: ${
+            MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, 
latestSnapshotUniqueId: ${
+            MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
 
-        val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
-          workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
+          val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
+            workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
 
-        loadedVersion = latestSnapshotVersion
+          loadedVersion = latestSnapshotVersion
 
-        // reset the last snapshot version to the latest available snapshot 
version
-        lastSnapshotVersion = latestSnapshotVersion
-        lineageManager.resetLineage(currVersionLineage)
+          // reset the last snapshot version to the latest available snapshot 
version
+          lastSnapshotVersion = latestSnapshotVersion
+          lineageManager.resetLineage(currVersionLineage)
 
-        // Initialize maxVersion upon successful load from DFS
-        fileManager.setMaxSeenVersion(version)
+          // Initialize maxVersion upon successful load from DFS
+          fileManager.setMaxSeenVersion(version)
 
-        // Report this snapshot version to the coordinator
-        reportSnapshotUploadToCoordinator(latestSnapshotVersion)
+          // Report this snapshot version to the coordinator
+          reportSnapshotUploadToCoordinator(latestSnapshotVersion)
 
-        openLocalRocksDB(metadata)
+          openLocalRocksDB(metadata)
 
-        if (loadedVersion != version) {
-          val versionsAndUniqueIds = currVersionLineage.collect {
+          if (loadedVersion != version) {
+            val versionsAndUniqueIds = currVersionLineage.collect {
               case i if i.version > loadedVersion && i.version <= version =>
                 (i.version, Option(i.checkpointUniqueId))
             }
-          replayChangelog(versionsAndUniqueIds)
-          loadedVersion = version
-          lineageManager.resetLineage(currVersionLineage)
+            replayChangelog(versionsAndUniqueIds)
+            loadedVersion = version
+            lineageManager.resetLineage(currVersionLineage)
+          }

Review Comment:
   These lines are the same as old version except for indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to