viirya commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r655927681
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}
+ /**
+ * Load all necessary files for specific checkpoint version from DFS to
given local directory.
+ * If version is 0, then it will deleted all files in the directory. For
other versions, it
+ * ensures that only the exact files generated during checkpointing will be
present in the
+ * local directory.
+ */
+ def loadCheckpointFromDfs(version: Long, localDir: File):
RocksDBCheckpointMetadata = {
+ logInfo(s"Loading checkpoint files for version $version")
+ val metadata = if (version == 0) {
+ if (localDir.exists) Utils.deleteRecursively(localDir)
+ localDir.mkdirs()
+ RocksDBCheckpointMetadata(Seq.empty, 0)
+ } else {
+ // Delete all non-immutable files in local dir, and unzip new ones from
DFS commit file
+ listRocksDBFiles(localDir)._2.foreach(_.delete())
+ Utils.unzipFromFile(fs, dfsBatchZipFile(version), localDir)
+
+ // Copy the necessary immutable files
+ val metadataFile = localMetadataFile(localDir)
+ val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
+ logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
+ loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
+ versionToRocksDBFiles.put(version, metadata.immutableFiles)
+ metadataFile.delete()
+ metadata
+ }
+ logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
+ metadata
+ }
+
+ /** Get the latest version available in the DFS directory. If no data
present, it returns 0. */
+ def getLatestVersion(): Long = {
Review comment:
Is this only used by tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]