venkata91 commented on a change in pull request #26339:
[SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition
overwrite a task would conflict with its speculative task
URL: https://github.com/apache/spark/pull/26339#discussion_r405023852
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -236,13 +265,25 @@ class HadoopMapReduceCommitProtocol(
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
+ dynamicStagingTaskFiles = mutable.Set[Path]()
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
= {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId,
attemptId.getTaskID.getId)
+ if (dynamicPartitionOverwrite) {
Review comment:
so where does SparkHadoopMapRedUtil.commitTask moves the data from/to and
how does dynamicPartitionOverwrite deal with that output? Can you please give
an example? It seems there are 2 renames happening one due to
SparkHadoopMapRedUtil.commitTask and then with the code block inside
dynamicPartitionOverwrite.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]