advancedxy commented on a change in pull request #25739:
[WIP][SPARK-28945][CORE][SQL] Support concurrent dynamic partition writes to
different partitions in the same table
URL: https://github.com/apache/spark/pull/25739#discussion_r324955069
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
##########
@@ -91,7 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
+ /**
+ * Get the desired output path for the job. The output will be [[path]] when
+ * dynamicPartitionOverwrite is disabled, otherwise, it will be
[[stagingDir]]. We choose
+ * [[stagingDir]] over [[path]] to avoid potential collision of concurrent
write jobs as the same
+ * output will be specified when writing to the same table dynamically.
+ *
+ * @return Path the desired output path.
+ */
+ protected def getOutputPath(context: TaskAttemptContext): Path = {
+ if (dynamicPartitionOverwrite) {
+ val conf = context.getConfiguration
+ val outputPath = stagingDir.getFileSystem(conf).makeQualified(stagingDir)
+ outputPath
+ } else {
+ new Path(path)
+ }
+ }
+
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter =
{
+ // set output path to stagingDir to avoid potential collision of multiple
concurrent write tasks
Review comment:
> In fact, I don't see how the committer is related to the staging dir. If
you look at commitTask and commitJob, we kind of manually commit the files in
the staging dir, by moving it to the table dir.
Yes, we manually commit files in the staging dir. The problem is in the
`HadoopMapReduceCommitProtocol`'s commitJob calls, it first calls
`committer.commitJob(jobContext)`, which relates to the output path passes to
the JobContext.
https://github.com/apache/spark/blob/1de7d307fe7f6bde8753e8b348d38575e2516e4a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L190-L198
The OutputCommitter cannot work correctly if multiple OutputCommitter
working on the same output path( concurrent writes to different partition to
the same table, as the output would be the same: the table output location).
After changing the output path to the staging dir, concurrent jobs can have
different output dirs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]