turboFei commented on a change in pull request #26339:
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405241541
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
+ dynamicStagingTaskFiles = mutable.Set[Path]()
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
= {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId,
attemptId.getTaskID.getId)
+ if (dynamicPartitionOverwrite) {
+ val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+ dynamicStagingTaskFiles.foreach { stagingTaskFile =>
+ val fileName = stagingTaskFile.getName
+ val taskPartitionPath = getDynamicPartitionPath(stagingTaskFile,
taskContext)
+ val destFile = new Path(taskPartitionPath, fileName)
+ if (!fs.rename(stagingTaskFile, destFile)) {
Review comment:
We can add a check here, and if destFile existed, I think we can remove it
directly, for that this operation is after SparkHadoopMapRedUtil.commitTask.
If outputCommitCoordination is enabled, only one task can commit its output.
Otherwise, it does not matter to remove it directly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]