xuanyuanking commented on a change in pull request #32933: URL: https://github.com/apache/spark/pull/32933#discussion_r661303956
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala ########## @@ -153,6 +156,139 @@ class RocksDBFileManager( logInfo(s"Saved checkpoint file for version $version") } + /** + * Load all necessary files for specific checkpoint version from DFS to given local directory. + * If version is 0, then it will deleted all files in the directory. For other versions, it + * ensures that only the exact files generated during checkpointing will be present in the + * local directory. + */ + def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { + logInfo(s"Loading checkpoint files for version $version") + val metadata = if (version == 0) { + if (localDir.exists) Utils.deleteRecursively(localDir) + localDir.mkdirs() + RocksDBCheckpointMetadata(Seq.empty, 0) + } else { + // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file + listRocksDBFiles(localDir)._2.foreach(_.delete()) + Utils.unzipFromFile(fs, dfsBatchZipFile(version), localDir) + + // Copy the necessary immutable files + val metadataFile = localMetadataFile(localDir) + val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) + logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}") + loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) + versionToRocksDBFiles.put(version, metadata.immutableFiles) + metadataFile.delete() + metadata + } + logFilesInDir(localDir, s"Loaded checkpoint files for version $version") + metadata + } + + /** Get the latest version available in the DFS directory. If no data present, it returns 0. */ + def getLatestVersion(): Long = { + val path = new Path(dfsRootDir) + if (fm.exists(path)) { + fm.list(path, onlyZipFiles) + .map(_.getPath.getName.stripSuffix(".zip")) + .map(_.toLong) + .foldLeft(0L)(math.max) + } else { + 0 + } + } + + /** + * Delete old versions by deleting the associated version and SST files. + * At a high-level, this method finds which versions to delete, and which SST files that were + * last used in those versions. Its safe to delete these SST files because a SST file can + * be reused only in successive versions. Therefore, if a SST file F was last used in version + * V, then it wont be used in version V+1 or later, and if version V can be deleted, then Review comment: That's right. The file can not be shared with skipping versions. If a file used in V and not used in V+1, the checkpoint of V+1 should already create new files for all the KVs included in the original file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org