venkata91 commented on a change in pull request #26339: URL: https://github.com/apache/spark/pull/26339#discussion_r415224131
########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) + fs.mkdirs(partitionPath) + val finalFile = new Path(partitionPath, fileName) + if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { + logWarning( + s""" + | For dynamic partition overwrite operation with speculation enabled, failed to Review comment: what happens if yarn preemption is enabled, can the rename of file happen but then the executor got preempted? In that case, this warn might not make sense. I guess we don't need to say anything specific to speculation. ########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) + fs.mkdirs(partitionPath) Review comment: check if the directory exists already, if not call `fs.mkdirs(partitionPath)` ########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -236,13 +271,35 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite && isSpeculationEnabled) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(stagingTaskFile, taskContext) + fs.mkdirs(partitionPath) + val finalFile = new Path(partitionPath, fileName) + if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { + logWarning( + s""" + | For dynamic partition overwrite operation with speculation enabled, failed to + | rename the staging dynamic file:$stagingTaskFile to $finalFile. Some other task + | has renamed a staging dynamic file to $finalFile. See details in SPARK-29302. + """.stripMargin) + } else { + throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") Review comment: if rename failed shouldn't we wrap and propagate that exception? Like permission denied etc. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org