steveloughran commented on code in PR #36070:
URL: https://github.com/apache/spark/pull/36070#discussion_r1029595674


##########
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##########
@@ -194,19 +196,33 @@ class HadoopMapReduceCommitProtocol(
     if (hasValidPath) {
       val (allAbsPathFiles, allPartitionPaths) =
         taskCommits.map(_.obj.asInstanceOf[(Map[String, String], 
Set[String])]).unzip
-      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+      val hadoopConf = jobContext.getConfiguration
+      val fs = stagingDir.getFileSystem(hadoopConf)
 
       val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
       logDebug(s"Committing files staged for absolute locations $filesToMove")
       val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
       if (dynamicPartitionOverwrite) {
         logDebug(s"Clean up absolute partition directories for overwriting: 
$absParentPaths")
-        absParentPaths.foreach(fs.delete(_, true))
+        absParentPaths.foreach(path => path.getFileSystem(hadoopConf)
+          .delete(path, true))
       }
       logDebug(s"Create absolute parent directories: $absParentPaths")
-      absParentPaths.foreach(fs.mkdirs)
+      absParentPaths.foreach(path => 
path.getFileSystem(hadoopConf).mkdirs(path))
       for ((src, dst) <- filesToMove) {
-        if (!fs.rename(new Path(src), new Path(dst))) {
+        val srcPath = new Path(src)
+        val dstPath = new Path(dst)
+        val srcFs = srcPath.getFileSystem(hadoopConf)
+        val dstFs = dstPath.getFileSystem(hadoopConf)
+        // Copying files across different file systems
+        if (needCopy(srcPath, dstPath, srcFs, dstFs)) {
+          if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), 
dstFs, dstPath,

Review Comment:
   can i highlight something i've noticed here, that copy() command stos on src 
read() returning -1, without doing any checks to validate file length, not 
great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to