dongjoon-hyun commented on code in PR #55622:
URL: https://github.com/apache/spark/pull/55622#discussion_r3169126066
##########
hadoop-cloud/src/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala:
##########
@@ -264,5 +277,107 @@ class CommitterBindingSuite extends SparkFunSuite {
"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
}
-}
+ /*
+ * With dynamicPartitionOverwrite=true and a FileOutputCommitter,
newTaskTempFile must route
+ * output through the staging directory (not the final output path) and must
record the partition
+ * in partitionPaths so that commitJob can delete the old partition
directory and rename the
+ * staged one into place.
+ */
+ test("SPARK-56588: FileOutputCommitter dynamic partition overwrite stages
output and tracks " +
+ "partitions") {
+ val jobCommitDir = File.createTempFile("dyn-part-overwrite-staging", "")
+ try {
+ jobCommitDir.delete()
+ val jobUri = jobCommitDir.toURI
+ val path = new Path(jobUri)
+ val job = newJob(path)
+ val conf = job.getConfiguration
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ bindToFileOutputCommitterFactory(conf, "file")
+ val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+ val committer = new PathOutputCommitProtocolForTest(jobId,
jobUri.toString, true)
+ committer.setupJob(tContext)
+ committer.setupTask(tContext)
+
+ val spec = FileNameSpec("", ".parquet")
+ val partition = "a=1/b=2"
+ val tempPath = committer.newTaskTempFile(tContext, Some(partition), spec)
+
+ // The temp file must be under the staging directory, not the final
output path.
+ assert(tempPath.contains(".spark-staging-"),
+ s"Expected temp path under staging dir, got: $tempPath")
+ assert(!tempPath.startsWith(path.toUri.toString.stripSuffix("/") + "/" +
partition),
+ s"Temp path must not point directly to the final output location:
$tempPath")
+
+ // The partition must have been recorded so commitJob can overwrite it.
+ assert(committer.capturedPartitionPaths === Set(partition),
+ s"Expected partitionPaths = {$partition}, got:
${committer.capturedPartitionPaths}")
+ } finally {
+ jobCommitDir.delete()
+ }
+ }
+
+ /*
+ * A cloud committer that handles dynamic partitioning natively (via
StreamCapabilities) must NOT
+ * have its partitions tracked in Spark's partitionPaths set: the committer
takes care of
+ * overwriting itself, and the commitJob rename loop must not interfere.
+ */
+ test("SPARK-56588: Cloud committer with dynamic partition support does not
track partitions in " +
+ "partitionPaths") {
Review Comment:
Thank you for adding these test cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]