advancedxy commented on a change in pull request #25739: 
[WIP][SPARK-28945][CORE][SQL] Support concurrent dynamic partition writes to 
different partitions in the same table
URL: https://github.com/apache/spark/pull/25739#discussion_r324955069
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##########
 @@ -91,7 +91,31 @@ class HadoopMapReduceCommitProtocol(
    */
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Get the desired output path for the job. The output will be [[path]] when
+   * dynamicPartitionOverwrite is disabled, otherwise, it will be 
[[stagingDir]]. We choose
+   * [[stagingDir]] over [[path]] to avoid potential collision of concurrent 
write jobs as the same
+   * output will be specified when writing to the same table dynamically.
+   *
+   * @return Path the desired output path.
+   */
+  protected def getOutputPath(context: TaskAttemptContext): Path = {
+    if (dynamicPartitionOverwrite) {
+      val conf = context.getConfiguration
+      val outputPath = stagingDir.getFileSystem(conf).makeQualified(stagingDir)
+      outputPath
+    } else {
+      new Path(path)
+    }
+  }
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
+    // set output path to stagingDir to avoid potential collision of multiple 
concurrent write tasks
 
 Review comment:
   > In fact, I don't see how the committer is related to the staging dir. If 
you look at commitTask and commitJob, we kind of manually commit the files in 
the staging dir, by moving it to the table dir.
   
   Yes, we manually commit files in the staging dir. The problem is in the 
`HadoopMapReduceCommitProtocol`'s commitJob calls, it first calls 
`committer.commitJob(jobContext)`, which relates to the output path passes to 
the JobContext.
   
https://github.com/apache/spark/blob/1de7d307fe7f6bde8753e8b348d38575e2516e4a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L190-L198
   
   The OutputCommitter cannot work correctly if multiple OutputCommitter 
working on the same output path( concurrent writes to different partition to 
the same table, as the output would be the same: the table output location). 
After changing the output path to the staging dir, concurrent jobs can have 
different output dirs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to