HeartSaVioR commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r655951825



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
     logInfo(s"Saved checkpoint file for version $version")
   }
 
+  /**
+   * Load all necessary files for specific checkpoint version from DFS to 
given local directory.
+   * If version is 0, then it will deleted all files in the directory. For 
other versions, it
+   * ensures that only the exact files generated during checkpointing will be 
present in the
+   * local directory.
+   */
+  def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {
+    logInfo(s"Loading checkpoint files for version $version")
+    val metadata = if (version == 0) {
+      if (localDir.exists) Utils.deleteRecursively(localDir)
+      localDir.mkdirs()
+      RocksDBCheckpointMetadata(Seq.empty, 0)
+    } else {
+      // Delete all non-immutable files in local dir, and unzip new ones from 
DFS commit file

Review comment:
       What if we just remove the all files in localDir? Just would like to 
know the reason we don't clear the directory but just remove the specific 
files. Would we need to leverage some remaining files?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */

Review comment:
       We have been using one-liner java doc; e.g. 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L50

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */
+  def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): 
Seq[File] = {
+    val files = new mutable.ArrayBuffer[File]()
+    val in = new ZipInputStream(fs.open(dfsZipFile))
+    var out: OutputStream = null
+    try {
+      var entry = in.getNextEntry()
+      while (entry != null) {
+        if (!entry.isDirectory) {

Review comment:
       It'd be ideal if we make it clear in method name, like 
`unzipFilesFromFile`.  (Ideally I'd like to see this also extracts the 
directory, but let's postpone it till necessary.)
   
   In general we expect unzipping will extract the directories as well. That 
said, we need to make the behavior very clear to the caller side. I agree this 
should be mentioned to the java doc, but method name should be also intuitive 
to expect the actual behavior.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
     immutableFiles
   }
 
+  /**
+   * Copy files from DFS directory to a local directory. It will figure out 
which
+   * existing files are needed, and accordingly, unnecessary SST files are 
deleted while
+   * necessary and non-existing files are copied from DFS.
+   */
+  private def loadImmutableFilesFromDfs(
+      immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+    val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
+    // Delete unnecessary local immutable files
+    listRocksDBFiles(localDir)._1
+      .foreach { existingFile =>
+        val isSameFile =
+          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        if (!isSameFile) {
+          existingFile.delete()
+          logInfo(s"Deleted local file $existingFile")
+        }
+      }
+
+    var filesCopied = 0L
+    var bytesCopied = 0L
+    var filesReused = 0L
+    immutableFiles.foreach { file =>
+      val localFileName = file.localFileName
+      val localFile = localFilePath(localDir, localFileName)
+      if (!localFile.exists) {
+        val dfsFile = dfsFilePath(file.dfsFileName)
+        // Note: The implementation of copyToLocalFile() closes the output 
stream when there is
+        // any exception while copying. So this may generate partial files on 
DFS. But that is
+        // okay because until the main [version].zip file is written, those 
partial files are
+        // not going to be used at all. Eventually these files should get 
cleared.
+        fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+        val localFileSize = localFile.length()
+        filesCopied += 1
+        bytesCopied += localFileSize
+        logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")

Review comment:
       More specifically, we can do the file size check just after 
copyToLocalFile, and accumulations can be placed later.

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */
+  def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): 
Seq[File] = {
+    val files = new mutable.ArrayBuffer[File]()
+    val in = new ZipInputStream(fs.open(dfsZipFile))
+    var out: OutputStream = null
+    try {
+      var entry = in.getNextEntry()
+      while (entry != null) {
+        if (!entry.isDirectory) {
+          val fileName = 
localDir.toPath.resolve(entry.getName).getFileName.toString
+          val outFile = new File(localDir, fileName)
+          files += outFile
+          out = new FileOutputStream(outFile)
+          IOUtils.copy(in, out)
+          out.close()
+          in.closeEntry()
+        }
+        entry = in.getNextEntry()
+      }
+      in.close() // so that any error in closing does not get ignored
+      logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
+    } finally {
+      // Close everything no matter what happened
+      IOUtils.closeQuietly(in)
+      IOUtils.closeQuietly(out)
+    }

Review comment:
       This looks safe; if there's an exception we may see some files being 
extracted and the one of output files may be broken, but callers will catch an 
exception and indicate the output directory is not healthy. If necessary let's 
document this in javadoc as well.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
     logInfo(s"Saved checkpoint file for version $version")
   }
 
+  /**
+   * Load all necessary files for specific checkpoint version from DFS to 
given local directory.
+   * If version is 0, then it will deleted all files in the directory. For 
other versions, it

Review comment:
       nit: deleted -> delete

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */

Review comment:
       But we'd like to see this multiple lines as below review comments :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to