HeartSaVioR commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r655951825
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}
+ /**
+ * Load all necessary files for specific checkpoint version from DFS to
given local directory.
+ * If version is 0, then it will deleted all files in the directory. For
other versions, it
+ * ensures that only the exact files generated during checkpointing will be
present in the
+ * local directory.
+ */
+ def loadCheckpointFromDfs(version: Long, localDir: File):
RocksDBCheckpointMetadata = {
+ logInfo(s"Loading checkpoint files for version $version")
+ val metadata = if (version == 0) {
+ if (localDir.exists) Utils.deleteRecursively(localDir)
+ localDir.mkdirs()
+ RocksDBCheckpointMetadata(Seq.empty, 0)
+ } else {
+ // Delete all non-immutable files in local dir, and unzip new ones from
DFS commit file
Review comment:
What if we just remove the all files in localDir? Just would like to
know the reason we don't clear the directory but just remove the specific
files. Would we need to leverage some remaining files?
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
Review comment:
We have been using one-liner java doc; e.g.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L50
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
+ def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File):
Seq[File] = {
+ val files = new mutable.ArrayBuffer[File]()
+ val in = new ZipInputStream(fs.open(dfsZipFile))
+ var out: OutputStream = null
+ try {
+ var entry = in.getNextEntry()
+ while (entry != null) {
+ if (!entry.isDirectory) {
Review comment:
It'd be ideal if we make it clear in method name, like
`unzipFilesFromFile`. (Ideally I'd like to see this also extracts the
directory, but let's postpone it till necessary.)
In general we expect unzipping will extract the directories as well. That
said, we need to make the behavior very clear to the caller side. I agree this
should be mentioned to the java doc, but method name should be also intuitive
to expect the actual behavior.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
immutableFiles
}
+ /**
+ * Copy files from DFS directory to a local directory. It will figure out
which
+ * existing files are needed, and accordingly, unnecessary SST files are
deleted while
+ * necessary and non-existing files are copied from DFS.
+ */
+ private def loadImmutableFilesFromDfs(
+ immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+ val requiredFileNameToFileDetails = immutableFiles.map(f =>
f.localFileName -> f).toMap
+ // Delete unnecessary local immutable files
+ listRocksDBFiles(localDir)._1
+ .foreach { existingFile =>
+ val isSameFile =
+
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+ if (!isSameFile) {
+ existingFile.delete()
+ logInfo(s"Deleted local file $existingFile")
+ }
+ }
+
+ var filesCopied = 0L
+ var bytesCopied = 0L
+ var filesReused = 0L
+ immutableFiles.foreach { file =>
+ val localFileName = file.localFileName
+ val localFile = localFilePath(localDir, localFileName)
+ if (!localFile.exists) {
+ val dfsFile = dfsFilePath(file.dfsFileName)
+ // Note: The implementation of copyToLocalFile() closes the output
stream when there is
+ // any exception while copying. So this may generate partial files on
DFS. But that is
+ // okay because until the main [version].zip file is written, those
partial files are
+ // not going to be used at all. Eventually these files should get
cleared.
+ fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+ val localFileSize = localFile.length()
+ filesCopied += 1
+ bytesCopied += localFileSize
+ logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
Review comment:
More specifically, we can do the file size check just after
copyToLocalFile, and accumulations can be placed later.
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
+ def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File):
Seq[File] = {
+ val files = new mutable.ArrayBuffer[File]()
+ val in = new ZipInputStream(fs.open(dfsZipFile))
+ var out: OutputStream = null
+ try {
+ var entry = in.getNextEntry()
+ while (entry != null) {
+ if (!entry.isDirectory) {
+ val fileName =
localDir.toPath.resolve(entry.getName).getFileName.toString
+ val outFile = new File(localDir, fileName)
+ files += outFile
+ out = new FileOutputStream(outFile)
+ IOUtils.copy(in, out)
+ out.close()
+ in.closeEntry()
+ }
+ entry = in.getNextEntry()
+ }
+ in.close() // so that any error in closing does not get ignored
+ logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
+ } finally {
+ // Close everything no matter what happened
+ IOUtils.closeQuietly(in)
+ IOUtils.closeQuietly(out)
+ }
Review comment:
This looks safe; if there's an exception we may see some files being
extracted and the one of output files may be broken, but callers will catch an
exception and indicate the output directory is not healthy. If necessary let's
document this in javadoc as well.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}
+ /**
+ * Load all necessary files for specific checkpoint version from DFS to
given local directory.
+ * If version is 0, then it will deleted all files in the directory. For
other versions, it
Review comment:
nit: deleted -> delete
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
+
+ /** Decompress a zip file into a local dir. File names are read from the zip
file. */
Review comment:
But we'd like to see this multiple lines as below review comments :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]