viirya commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r655924518
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
immutableFiles
}
+ /**
+ * Copy files from DFS directory to a local directory. It will figure out
which
+ * existing files are needed, and accordingly, unnecessary SST files are
deleted while
+ * necessary and non-existing files are copied from DFS.
+ */
+ private def loadImmutableFilesFromDfs(
+ immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+ val requiredFileNameToFileDetails = immutableFiles.map(f =>
f.localFileName -> f).toMap
+ // Delete unnecessary local immutable files
+ listRocksDBFiles(localDir)._1
+ .foreach { existingFile =>
+ val isSameFile =
+
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+ if (!isSameFile) {
+ existingFile.delete()
+ logInfo(s"Deleted local file $existingFile")
+ }
+ }
+
+ var filesCopied = 0L
+ var bytesCopied = 0L
+ var filesReused = 0L
+ immutableFiles.foreach { file =>
+ val localFileName = file.localFileName
+ val localFile = localFilePath(localDir, localFileName)
+ if (!localFile.exists) {
+ val dfsFile = dfsFilePath(file.dfsFileName)
+ // Note: The implementation of copyToLocalFile() closes the output
stream when there is
+ // any exception while copying. So this may generate partial files on
DFS. But that is
+ // okay because until the main [version].zip file is written, those
partial files are
+ // not going to be used at all. Eventually these files should get
cleared.
+ fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+ val localFileSize = localFile.length()
+ filesCopied += 1
+ bytesCopied += localFileSize
+ logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
+ val expectedSize = file.sizeBytes
+ if (localFileSize != expectedSize) {
+ throw new IllegalStateException(
+ s"Copied $dfsFile to $localFile," +
+ s" expected $expectedSize bytes, found $localFileSize bytes ")
+ }
+ } else {
+ filesReused += 1
+ }
+ }
+ logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to
local.")
Review comment:
Add `filesReused` into this log message?
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
+ def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File):
Seq[File] = {
+ val files = new mutable.ArrayBuffer[File]()
+ val in = new ZipInputStream(fs.open(dfsZipFile))
+ var out: OutputStream = null
+ try {
+ var entry = in.getNextEntry()
+ while (entry != null) {
+ if (!entry.isDirectory) {
Review comment:
We don't process directory. Could you also mention it in the method doc?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
immutableFiles
}
+ /**
+ * Copy files from DFS directory to a local directory. It will figure out
which
+ * existing files are needed, and accordingly, unnecessary SST files are
deleted while
+ * necessary and non-existing files are copied from DFS.
+ */
+ private def loadImmutableFilesFromDfs(
+ immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+ val requiredFileNameToFileDetails = immutableFiles.map(f =>
f.localFileName -> f).toMap
+ // Delete unnecessary local immutable files
+ listRocksDBFiles(localDir)._1
+ .foreach { existingFile =>
+ val isSameFile =
+
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+ if (!isSameFile) {
+ existingFile.delete()
+ logInfo(s"Deleted local file $existingFile")
+ }
Review comment:
Or just a safer guard?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
immutableFiles
}
+ /**
+ * Copy files from DFS directory to a local directory. It will figure out
which
+ * existing files are needed, and accordingly, unnecessary SST files are
deleted while
+ * necessary and non-existing files are copied from DFS.
+ */
+ private def loadImmutableFilesFromDfs(
+ immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+ val requiredFileNameToFileDetails = immutableFiles.map(f =>
f.localFileName -> f).toMap
+ // Delete unnecessary local immutable files
+ listRocksDBFiles(localDir)._1
+ .foreach { existingFile =>
+ val isSameFile =
+
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+ if (!isSameFile) {
+ existingFile.delete()
+ logInfo(s"Deleted local file $existingFile")
+ }
Review comment:
When it is possible to have existing file in local dir which has same
file name but not the same file in DFS?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
immutableFiles
}
+ /**
+ * Copy files from DFS directory to a local directory. It will figure out
which
+ * existing files are needed, and accordingly, unnecessary SST files are
deleted while
+ * necessary and non-existing files are copied from DFS.
+ */
+ private def loadImmutableFilesFromDfs(
+ immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+ val requiredFileNameToFileDetails = immutableFiles.map(f =>
f.localFileName -> f).toMap
+ // Delete unnecessary local immutable files
+ listRocksDBFiles(localDir)._1
+ .foreach { existingFile =>
+ val isSameFile =
+
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+ if (!isSameFile) {
+ existingFile.delete()
+ logInfo(s"Deleted local file $existingFile")
+ }
+ }
+
+ var filesCopied = 0L
+ var bytesCopied = 0L
+ var filesReused = 0L
+ immutableFiles.foreach { file =>
+ val localFileName = file.localFileName
+ val localFile = localFilePath(localDir, localFileName)
+ if (!localFile.exists) {
+ val dfsFile = dfsFilePath(file.dfsFileName)
+ // Note: The implementation of copyToLocalFile() closes the output
stream when there is
+ // any exception while copying. So this may generate partial files on
DFS. But that is
+ // okay because until the main [version].zip file is written, those
partial files are
+ // not going to be used at all. Eventually these files should get
cleared.
+ fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+ val localFileSize = localFile.length()
+ filesCopied += 1
+ bytesCopied += localFileSize
+ logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
Review comment:
We can do this logInfo after the file size check.
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
+ def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File):
Seq[File] = {
+ val files = new mutable.ArrayBuffer[File]()
+ val in = new ZipInputStream(fs.open(dfsZipFile))
+ var out: OutputStream = null
+ try {
+ var entry = in.getNextEntry()
+ while (entry != null) {
+ if (!entry.isDirectory) {
+ val fileName =
localDir.toPath.resolve(entry.getName).getFileName.toString
+ val outFile = new File(localDir, fileName)
+ files += outFile
+ out = new FileOutputStream(outFile)
+ IOUtils.copy(in, out)
+ out.close()
+ in.closeEntry()
+ }
+ entry = in.getNextEntry()
+ }
+ in.close() // so that any error in closing does not get ignored
+ logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
+ } finally {
+ // Close everything no matter what happened
+ IOUtils.closeQuietly(in)
+ IOUtils.closeQuietly(out)
+ }
Review comment:
Hmm, are we sure we don't need to process any error during unzipping?
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
Review comment:
Looks like this is not the Java doc style we follow.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}
+ /**
+ * Load all necessary files for specific checkpoint version from DFS to
given local directory.
+ * If version is 0, then it will deleted all files in the directory. For
other versions, it
+ * ensures that only the exact files generated during checkpointing will be
present in the
+ * local directory.
+ */
+ def loadCheckpointFromDfs(version: Long, localDir: File):
RocksDBCheckpointMetadata = {
+ logInfo(s"Loading checkpoint files for version $version")
+ val metadata = if (version == 0) {
+ if (localDir.exists) Utils.deleteRecursively(localDir)
+ localDir.mkdirs()
+ RocksDBCheckpointMetadata(Seq.empty, 0)
+ } else {
+ // Delete all non-immutable files in local dir, and unzip new ones from
DFS commit file
+ listRocksDBFiles(localDir)._2.foreach(_.delete())
+ Utils.unzipFromFile(fs, dfsBatchZipFile(version), localDir)
+
+ // Copy the necessary immutable files
+ val metadataFile = localMetadataFile(localDir)
+ val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
+ logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
+ loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
+ versionToRocksDBFiles.put(version, metadata.immutableFiles)
+ metadataFile.delete()
+ metadata
+ }
+ logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
+ metadata
+ }
+
+ /** Get the latest version available in the DFS directory. If no data
present, it returns 0. */
+ def getLatestVersion(): Long = {
Review comment:
Is this only used by tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]