peter-toth opened a new pull request, #55622:
URL: https://github.com/apache/spark/pull/55622

   ### What changes were proposed in this pull request?
   
   Fix `PathOutputCommitProtocol` to correctly handle 
`dynamicPartitionOverwrite=true` when the underlying committer is a 
`FileOutputCommitter`. Two bugs were introduced by #37468 
([SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)):
   
   1. `FileOutputCommitter` was not redirected to the staging directory 
(`stagingDir`) in `setupCommitter`. As a result, the committer wrote task 
output directly to the final destination path rather than staging it, so the 
`commitJob` rename-and-overwrite logic never ran and `INSERT OVERWRITE` would 
append instead of replacing existing partition directories.
   
   2. `newTaskTempFile` did not populate `partitionPaths`. With no recorded 
partitions, `commitJob` skipped the delete-before-rename step entirely, again 
leaving old partition data in place.
   
   The fix mirrors the approach already used in 
`SQLHadoopMapReduceCommitProtocol`:
   - In `setupCommitter`, reinitialize `FileOutputCommitter` with `stagingDir` 
when `dynamicPartitionOverwrite=true` (matching 
`SQLHadoopMapReduceCommitProtocol`).
   - In `newTaskTempFile`, track `partitionPaths += dir.get` when the committer 
is a `FileOutputCommitter` (matching the parent class 
`HadoopMapReduceCommitProtocol`; the guard is intentionally absent for cloud 
committers that handle dynamic partition overwrite natively via 
`StreamCapabilities`).
   - Widen `partitionPaths` in `HadoopMapReduceCommitProtocol` from `private` 
to `protected` so the subclass can write to it.
   
   ### Why are the changes needed?
   
   `INSERT OVERWRITE` on a partitioned table with 
`partitionOverwriteMode=dynamic` silently appends data instead of replacing the 
written partitions when `PathOutputCommitProtocol` is in use.
   
   The bug was introduced by #37468 
([SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)), which 
enabled `FileOutputCommitter`-backed dynamic partition overwrite in 
`PathOutputCommitProtocol` without wiring up the staging mechanism that 
`HadoopMapReduceCommitProtocol` and `SQLHadoopMapReduceCommitProtocol` rely on 
to delete old partition directories and atomically move staged output into 
place.
   
   The problem became more visible after #32518 
([SPARK-35383](https://issues.apache.org/jira/browse/SPARK-35383)) changed 
`SparkContext` to activate `PathOutputCommitProtocol` whenever hadoop-cloud is 
on the classpath (via `fillMissingMagicCommitterConfsIfNeeded()`), not only 
when a magic-committer bucket is explicitly configured.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. With `spark.sql.sources.partitionOverwriteMode=dynamic` and 
`PathOutputCommitProtocol` active (the default when hadoop-cloud is available), 
`INSERT OVERWRITE` now correctly replaces the written partition directories 
instead of appending to them.
   
   ### How was this patch tested?
   
   Three unit tests added to `CommitterBindingSuite`:
   - `SPARK-56588: FileOutputCommitter dynamic partition overwrite stages 
output and tracks partitions` — verifies the temp file path goes through 
`.spark-staging-` and that `partitionPaths` is populated for 
`FileOutputCommitter`.
   - `SPARK-56588: Cloud committer with dynamic partition support does not 
track partitions in partitionPaths` — verifies that cloud committers 
implementing `CAPABILITY_DYNAMIC_PARTITIONING` via `StreamCapabilities` do not 
have their partitions tracked in Spark's `partitionPaths` (they manage 
overwrite themselves).
   - `SPARK-56588: FileOutputCommitter without dynamicPartitionOverwrite does 
not track partitions` — baseline regression guard.
   
   Manually verified with a Spark shell session (hadoop-cloud on the classpath):
   ```
   ➜ bin/spark-shell
   
   scala> // Write initial data to two partitions
   scala> spark.range(6).selectExpr("id % 2 as p", "id as 
v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")
   scala> // p=0: rows 0,2,4   p=1: rows 1,3,5
   
   scala> spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
   
   scala> // Overwrite only p=0
   scala> spark.createDataFrame(Seq((0L, 99L))).toDF("p", 
"v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")
   
   scala> spark.read.parquet("/tmp/repro").orderBy("p", "v").show()
   
   // Before fix: p=0 still shows rows 0,2,4,99  (appended, not replaced)
   +---+---+
   |  v|  p|
   +---+---+
   |  0|  0|
   |  2|  0|
   |  4|  0|
   | 99|  0|
   |  1|  1|
   |  3|  1|
   |  5|  1|
   +---+---+
   
   // After fix:
   +---+---+
   |  v|  p|
   +---+---+
   | 99|  0|   <- p=0 correctly replaced
   |  1|  1|   <- p=1 untouched
   |  3|  1|
   |  5|  1|
   +---+---+
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Sonnet 4.6
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to