viirya commented on a change in pull request #32767:
URL: https://github.com/apache/spark/pull/32767#discussion_r655924518



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
     immutableFiles
   }
 
+  /**
+   * Copy files from DFS directory to a local directory. It will figure out 
which
+   * existing files are needed, and accordingly, unnecessary SST files are 
deleted while
+   * necessary and non-existing files are copied from DFS.
+   */
+  private def loadImmutableFilesFromDfs(
+      immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+    val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
+    // Delete unnecessary local immutable files
+    listRocksDBFiles(localDir)._1
+      .foreach { existingFile =>
+        val isSameFile =
+          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        if (!isSameFile) {
+          existingFile.delete()
+          logInfo(s"Deleted local file $existingFile")
+        }
+      }
+
+    var filesCopied = 0L
+    var bytesCopied = 0L
+    var filesReused = 0L
+    immutableFiles.foreach { file =>
+      val localFileName = file.localFileName
+      val localFile = localFilePath(localDir, localFileName)
+      if (!localFile.exists) {
+        val dfsFile = dfsFilePath(file.dfsFileName)
+        // Note: The implementation of copyToLocalFile() closes the output 
stream when there is
+        // any exception while copying. So this may generate partial files on 
DFS. But that is
+        // okay because until the main [version].zip file is written, those 
partial files are
+        // not going to be used at all. Eventually these files should get 
cleared.
+        fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+        val localFileSize = localFile.length()
+        filesCopied += 1
+        bytesCopied += localFileSize
+        logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
+        val expectedSize = file.sizeBytes
+        if (localFileSize != expectedSize) {
+          throw new IllegalStateException(
+            s"Copied $dfsFile to $localFile," +
+              s" expected $expectedSize bytes, found $localFileSize bytes ")
+        }
+      } else {
+        filesReused += 1
+      }
+    }
+    logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to 
local.")

Review comment:
       Add `filesReused` into this log message?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */
+  def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): 
Seq[File] = {
+    val files = new mutable.ArrayBuffer[File]()
+    val in = new ZipInputStream(fs.open(dfsZipFile))
+    var out: OutputStream = null
+    try {
+      var entry = in.getNextEntry()
+      while (entry != null) {
+        if (!entry.isDirectory) {

Review comment:
       We don't process directory. Could you also mention it in the method doc?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
     immutableFiles
   }
 
+  /**
+   * Copy files from DFS directory to a local directory. It will figure out 
which
+   * existing files are needed, and accordingly, unnecessary SST files are 
deleted while
+   * necessary and non-existing files are copied from DFS.
+   */
+  private def loadImmutableFilesFromDfs(
+      immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+    val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
+    // Delete unnecessary local immutable files
+    listRocksDBFiles(localDir)._1
+      .foreach { existingFile =>
+        val isSameFile =
+          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        if (!isSameFile) {
+          existingFile.delete()
+          logInfo(s"Deleted local file $existingFile")
+        }

Review comment:
       Or just a safer guard?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
     immutableFiles
   }
 
+  /**
+   * Copy files from DFS directory to a local directory. It will figure out 
which
+   * existing files are needed, and accordingly, unnecessary SST files are 
deleted while
+   * necessary and non-existing files are copied from DFS.
+   */
+  private def loadImmutableFilesFromDfs(
+      immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+    val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
+    // Delete unnecessary local immutable files
+    listRocksDBFiles(localDir)._1
+      .foreach { existingFile =>
+        val isSameFile =
+          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        if (!isSameFile) {
+          existingFile.delete()
+          logInfo(s"Deleted local file $existingFile")
+        }

Review comment:
       When it is possible to have existing file in local dir which has same 
file name but not the same file in DFS?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -200,6 +246,55 @@ class RocksDBFileManager(
     immutableFiles
   }
 
+  /**
+   * Copy files from DFS directory to a local directory. It will figure out 
which
+   * existing files are needed, and accordingly, unnecessary SST files are 
deleted while
+   * necessary and non-existing files are copied from DFS.
+   */
+  private def loadImmutableFilesFromDfs(
+      immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
+    val requiredFileNameToFileDetails = immutableFiles.map(f => 
f.localFileName -> f).toMap
+    // Delete unnecessary local immutable files
+    listRocksDBFiles(localDir)._1
+      .foreach { existingFile =>
+        val isSameFile =
+          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        if (!isSameFile) {
+          existingFile.delete()
+          logInfo(s"Deleted local file $existingFile")
+        }
+      }
+
+    var filesCopied = 0L
+    var bytesCopied = 0L
+    var filesReused = 0L
+    immutableFiles.foreach { file =>
+      val localFileName = file.localFileName
+      val localFile = localFilePath(localDir, localFileName)
+      if (!localFile.exists) {
+        val dfsFile = dfsFilePath(file.dfsFileName)
+        // Note: The implementation of copyToLocalFile() closes the output 
stream when there is
+        // any exception while copying. So this may generate partial files on 
DFS. But that is
+        // okay because until the main [version].zip file is written, those 
partial files are
+        // not going to be used at all. Eventually these files should get 
cleared.
+        fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
+        val localFileSize = localFile.length()
+        filesCopied += 1
+        bytesCopied += localFileSize
+        logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")

Review comment:
       We can do this logInfo after the file size check.

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */
+  def unzipFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): 
Seq[File] = {
+    val files = new mutable.ArrayBuffer[File]()
+    val in = new ZipInputStream(fs.open(dfsZipFile))
+    var out: OutputStream = null
+    try {
+      var entry = in.getNextEntry()
+      while (entry != null) {
+        if (!entry.isDirectory) {
+          val fileName = 
localDir.toPath.resolve(entry.getName).getFileName.toString
+          val outFile = new File(localDir, fileName)
+          files += outFile
+          out = new FileOutputStream(outFile)
+          IOUtils.copy(in, out)
+          out.close()
+          in.closeEntry()
+        }
+        entry = in.getNextEntry()
+      }
+      in.close() // so that any error in closing does not get ignored
+      logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
+    } finally {
+      // Close everything no matter what happened
+      IOUtils.closeQuietly(in)
+      IOUtils.closeQuietly(out)
+    }

Review comment:
       Hmm, are we sure we don't need to process any error during unzipping?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -3084,6 +3085,35 @@ private[spark] object Utils extends Logging {
     conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
       
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
   }
+
+  /** Decompress a zip file into a local dir. File names are read from the zip 
file. */

Review comment:
       Looks like this is not the Java doc style we follow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to