peter-toth commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3167915718
##########
hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -115,6 +115,17 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit
failures")
}
+
+ if (dynamicPartitionOverwrite) {
+ // FileOutputCommitter must be initialized with the staging directory
so that task output
+ // lands under stagingDir/_temporary/... and commitJob can later
delete the old partition
+ // directories and move staged files to final dest. Without this, the
committer writes
+ // directly to the final path and the dynamic-overwrite cleanup in
commitJob never sees any
+ // partitionPaths.
+ val ctor =
+ committer.getClass.getDeclaredConstructor(classOf[Path],
classOf[TaskAttemptContext])
+ committer = ctor.newInstance(stagingDir, context)
Review Comment:
How is that possible? As far as I see the commit protocol is instantiated
via `FileCommitProtocol.instantiate()` and called from either
`InsertIntoHadoopFsRelationCommand` (DSv1) or `FileWrite` (DSv2). In both cases
the `jobId` is `UUID.randomUUID()`, so staging directory collisions should not
be possible for this code path.
But if there's an issue then it should a separate pre-existing one as this
fix copies `SQLHadoopMapReduceCommitProtocol`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]