venkata91 commented on a change in pull request #26339:
URL: https://github.com/apache/spark/pull/26339#discussion_r415169284



##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
     partitionPaths = mutable.Set[String]()
+    dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
     logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    if (dynamicPartitionOverwrite) {
+      val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+      dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+        val fileName = stagingTaskFile.getName
+        val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+        val finalFile = new Path(partitionPath, fileName)
+        if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {

Review comment:
       This will fail wrt HDFS, without having the parent directory fs.rename 
won't work. So the partitionPath directory has to be created before renaming it 
to final location. Please refer SPARK-23815

##########
File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol(
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
     partitionPaths = mutable.Set[String]()
+    dynamicStagingTaskFiles = mutable.Set[Path]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
     logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    if (dynamicPartitionOverwrite) {
+      val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+      dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+        val fileName = stagingTaskFile.getName
+        val partitionPath = getDynamicPartitionPath(stagingTaskFile, 
taskContext)
+        val finalFile = new Path(partitionPath, fileName)
+        if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
+         throw new IOException(s"Failed to rename $stagingTaskFile to 
$finalFile")

Review comment:
       In case of speculated attempts, its bound to come here as some other 
task might have renamed to the finalFile, instead of throwing an exception, may 
be we should log a warning with better message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to