siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1837426396


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -204,6 +210,9 @@ class RocksDBFileManager(
       case _ =>
         throw 
QueryExecutionErrors.invalidChangeLogReaderVersion(changelogVersion)
     }
+    if (checkpointUniqueId.isDefined) {
+      changelogReader.lineage

Review Comment:
   The way it is written is confusing. Can you improve?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1184,39 @@ class RocksDB(
     Option(acquiredThreadInfo).map(_.copy())
   }
 
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+    snapshot: RocksDB#RocksDBSnapshot,
+    fileManager: RocksDBFileManager,
+    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+    loggingId: String): RocksDBFileManagerMetrics = {

Review Comment:
   Can you explain why the function needs to be moved?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -268,28 +280,30 @@ class RocksDBFileManager(
   def loadCheckpointFromDfs(
       version: Long,
       localDir: File,
-      rocksDBFileMapping: RocksDBFileMapping): RocksDBCheckpointMetadata = {
-    logInfo(log"Loading checkpoint files for version 
${MDC(LogKeys.VERSION_NUM, version)}")
+      rocksDBFileMapping: RocksDBFileMapping,
+      checkpointUniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
+    logInfo(log"Loading checkpoint files for version 
${MDC(LogKeys.VERSION_NUM, version)} " +
+      log"checkpointUniqueId: ${MDC(LogKeys.UUID, 
checkpointUniqueId.getOrElse(""))}")
     // The unique ids of SST files are checked when opening a rocksdb 
instance. The SST files
     // in larger versions can't be reused even if they have the same size and 
name because
     // they belong to another rocksdb instance.
-    versionToRocksDBFiles.keySet().removeIf(_ >= version)
+    versionToRocksDBFiles.keySet().removeIf(_._1 >= version)
     val metadata = if (version == 0) {
       if (localDir.exists) Utils.deleteRecursively(localDir)
       localDir.mkdirs()
       RocksDBCheckpointMetadata(Seq.empty, 0)
     } else {
       // Delete all non-immutable files in local dir, and unzip new ones from 
DFS commit file
       listRocksDBFiles(localDir)._2.foreach(_.delete())
-      Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir)
+      Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, 
checkpointUniqueId), localDir)
 
       // Copy the necessary immutable files
       val metadataFile = localMetadataFile(localDir)
       val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
       logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, 
version)}:\n" +
         log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
       loadImmutableFilesFromDfs(metadata.immutableFiles, localDir, 
rocksDBFileMapping, version)
-      versionToRocksDBFiles.put(version, metadata.immutableFiles)
+      versionToRocksDBFiles.put((version, checkpointUniqueId), 
metadata.immutableFiles)

Review Comment:
   Since you touched this `versionToRocksDBFiles`, it is tricky. Can you ask 
Micheal to take a look too?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -322,14 +349,18 @@ class RocksDBFileManager(
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
       val files = fm.list(path).map(_.getPath)
-      val changelogFileVersions = files
-        .filter(onlyChangelogFiles.accept)
-        .map(_.getName.stripSuffix(".changelog"))
-        .map(_.toLong)
-      val snapshotFileVersions = files
-        .filter(onlyZipFiles.accept)
-        .map(_.getName.stripSuffix(".zip"))
-        .map(_.toLong)
+      val changelogFileVersions = files.filter(onlyChangelogFiles.accept)
+        .map(_.getName.stripSuffix(".changelog").split("_"))
+        .map {
+          case Array(version, _) => version.toLong
+          case Array(version) => version.toLong
+        }
+      val snapshotFileVersions = files.filter(onlyZipFiles.accept)
+        .map(_.getName.stripSuffix(".zip").split("_"))
+        .map {
+          case Array(version, _) => version.toLong
+          case Array(version) => version.toLong
+        }

Review Comment:
   Btw, this is an expensive call and I wonder what we use it for. I hope it is 
only used in unit test or outside state store.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -299,20 +313,33 @@ class RocksDBFileManager(
   }
 
   // Get latest snapshot version <= version
-  def getLatestSnapshotVersion(version: Long): Long = {
+  def getLatestSnapshotVersionAndUniqueId(
+      version: Long, checkpointUniqueId: Option[String] = None): Array[(Long, 
Option[String])] = {
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
       // If the latest version snapshot exists, we avoid listing.
-      if (fm.exists(dfsBatchZipFile(version))) {
-        return version
+      if (fm.exists(dfsBatchZipFile(version, checkpointUniqueId))) {
+        return Array((version, checkpointUniqueId))
+      } else if (fm.exists(dfsBatchZipFile(version))) {
+        // This is possible when the state was previously ran under checkpoint 
format v1
+        // and restarted with v2. Then even if there is checkpointUniqueId 
passed in, the file

Review Comment:
   If previously we ran v1, there won't be `checkpointUniqueId` available. So 
if `checkpointUniqueId` is available, there should not be a v1 path.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -299,20 +313,33 @@ class RocksDBFileManager(
   }
 
   // Get latest snapshot version <= version
-  def getLatestSnapshotVersion(version: Long): Long = {
+  def getLatestSnapshotVersionAndUniqueId(
+      version: Long, checkpointUniqueId: Option[String] = None): Array[(Long, 
Option[String])] = {
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
       // If the latest version snapshot exists, we avoid listing.
-      if (fm.exists(dfsBatchZipFile(version))) {
-        return version
+      if (fm.exists(dfsBatchZipFile(version, checkpointUniqueId))) {
+        return Array((version, checkpointUniqueId))
+      } else if (fm.exists(dfsBatchZipFile(version))) {
+        // This is possible when the state was previously ran under checkpoint 
format v1
+        // and restarted with v2. Then even if there is checkpointUniqueId 
passed in, the file
+        // does not have that uniqueId in the filename.
+        return Array((version, None))
       }
-      fm.list(path, onlyZipFiles)
-        .map(_.getPath.getName.stripSuffix(".zip"))
-        .map(_.toLong)
-        .filter(_ <= version)
-        .foldLeft(0L)(math.max)
+      val versionAndUniqueIds = fm.list(path, onlyZipFiles)
+        .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+        .filter {
+          case Array(ver, _) => ver.toLong <= version
+          case Array(ver) => ver.toLong <= version
+        }
+        .map {
+          case Array(version, uniqueId) => (version.toLong, Option(uniqueId))
+          case Array(version) => (version.toLong, None)
+        }
+      val maxVersion = versionAndUniqueIds.map(_._1).foldLeft(0L)(math.max)
+      versionAndUniqueIds.filter(_._1 == maxVersion)

Review Comment:
   As mentioned in another comment, if we can't find a snapshot with specific 
checkpointID, we should ignore snapshoting for this version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to