peter-toth commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3167915718


##########
hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala:
##########
@@ -115,6 +115,17 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
       }
+
+      if (dynamicPartitionOverwrite) {
+        // FileOutputCommitter must be initialized with the staging directory 
so that task output
+        // lands under stagingDir/_temporary/... and commitJob can later 
delete the old partition
+        // directories and move staged files to final dest. Without this, the 
committer writes
+        // directly to the final path and the dynamic-overwrite cleanup in 
commitJob never sees any
+        // partitionPaths.
+        val ctor =
+          committer.getClass.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
+        committer = ctor.newInstance(stagingDir, context)

Review Comment:
   How is that possible? As far as I see the commit protocol is instantiated 
via `FileCommitProtocol.instantiate()` and called fom 
`InsertIntoHadoopFsRelationCommand` (DSv1) or `FileWrite` (DSv2). In both cases 
the `jobId` is `UUID.randomUUID()`, so staging directory collisions should not 
be possible for this code path.
   But if there's an issue then it should a separate pre-existing one as this 
fix copies `SQLHadoopMapReduceCommitProtocol`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to