venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415169284
########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) + val finalFile = new Path(partitionPath, fileName) + if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { Review comment: This will fail wrt HDFS, without having the parent directory fs.rename won't work. So the partitionPath directory has to be created before renaming it to final location. Please refer SPARK-23815 ########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -236,13 +267,25 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) + val finalFile = new Path(partitionPath, fileName) + if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: In case of speculated attempts, its bound to come here as some other task might have renamed to the finalFile, instead of throwing an exception, may be we should log a warning with better message. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org