anikakelhanka opened a new pull request, #47439: URL: https://github.com/apache/spark/pull/47439
…partitionOveriteMode=dynamic ### What changes were proposed in this pull request? **Issue:** In the Spark versions post v3.0.2, the SUCCESS Marker file is missing on the root path when spark write is successful with dynamic partition write mode. * This issue happens specifically when partitionOverwriteMode = dynamic (Insert Overwrite - [SPARK-20236](https://issues.apache.org/jira/browse/SPARK-20236)). * "_SUCCESS" file is created for spark version <= 3.0.2, given: "spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”. * "_SUCCESS" file is not created for spark version > 3.0.2 even when "spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”. **Analysis (RCA):** * What changed: After the change for [SPARK-29302](https://issues.apache.org/jira/browse/SPARK-29302) (dynamic partition overwrite with speculation enabled) got merged, the SUCCESS marker file stopped getting created at the root location when the Spark job writes in dynamic partition override mode. * The change [SPARK-29302](https://issues.apache.org/jira/browse/SPARK-29302) (dynamic partition overwrite with speculation enabled) sets the committerOutputPath=${stagingDir} which previously stored root dir path, in [this codeblock](https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175). * The committerOutputPath parameter is passed on to the hadoop committer, which creates the SUCCESS marker file at the path specified in committerOutputPath parameter. Thus, the SUCESS marker is now created inside the stagingDir. * Once Hadoop committer has finished writing, The Spark Commit Protocol logic copies all the data files to root path, [but NOT the SUCCESS marker] before deleting the ${stagingDir}. * The stagingDir is then deleted along with SUCCESS Marker file. **Proposed Fix:** Add a step to copy _SUCCESS file as well to the final location before deleting the stagingDir [HadoopMapReduceCommitProtocol](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L239). ` // create the _SUCCESS file at the final location if created in staging val stagingSuccessPath = new Path(stagingDir, "_SUCCESS") if (fs.exists(stagingSuccessPath)) { val finalSuccessPath = new Path(path, "_SUCCESS") // create the _SUCCESS file at the final location, overwrite true fs.create(finalSuccessPath, true).close() }` ### Why are the changes needed? Unavailability of this success marker file can be problematic for certain kind of bigdata pipelines which depend on the marker file for their workflow. Such pipelines are configured to use success marker as a token of completion of spark processing and to trigger downstream flows subsequently. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The unit tests are added in the PR. Also, this has been running in our production images since 6 months+ without any issues/regression. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
