anikakelhanka opened a new pull request, #47439:
URL: https://github.com/apache/spark/pull/47439

   …partitionOveriteMode=dynamic
   
   
   ### What changes were proposed in this pull request?
   
   **Issue:**
   In the Spark versions post v3.0.2, the SUCCESS Marker file is missing on the 
root path when spark write is successful with dynamic partition write mode. 
   
   * This issue happens specifically when partitionOverwriteMode = dynamic 
(Insert Overwrite - 
[SPARK-20236](https://issues.apache.org/jira/browse/SPARK-20236)).
   
   * "_SUCCESS" file is created for spark version <= 3.0.2, given: 
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”.
   
   * "_SUCCESS" file is not created for spark version > 3.0.2 even when 
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”.
   
   **Analysis (RCA):**
   
   * What changed: After the change for 
[SPARK-29302](https://issues.apache.org/jira/browse/SPARK-29302) (dynamic 
partition overwrite with speculation enabled) got merged, the SUCCESS marker 
file stopped getting created at the root location when the Spark job writes in 
dynamic partition override mode.
   * The change 
[SPARK-29302](https://issues.apache.org/jira/browse/SPARK-29302) (dynamic 
partition overwrite with speculation enabled) sets the 
committerOutputPath=${stagingDir} which previously stored root dir path, in 
[this 
codeblock](https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175).
   * The committerOutputPath parameter is passed on to the hadoop committer, 
which creates the SUCCESS marker file at the path specified in 
committerOutputPath parameter. Thus, the SUCESS marker is now created inside 
the stagingDir.
   * Once Hadoop committer has finished writing, The Spark Commit Protocol 
logic copies all the data files to root path, [but NOT the SUCCESS marker] 
before deleting the ${stagingDir}. 

   * The stagingDir is then deleted along with SUCCESS Marker file.
   
   **Proposed Fix:**
   
   Add a step to copy _SUCCESS file as well to the final location before 
deleting the stagingDir 
[HadoopMapReduceCommitProtocol](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L239).
   
   `    // create the _SUCCESS file at the final location if created in staging
         val stagingSuccessPath = new Path(stagingDir, "_SUCCESS")
         if (fs.exists(stagingSuccessPath)) {
           val finalSuccessPath = new Path(path, "_SUCCESS")
           // create the _SUCCESS file at the final location, overwrite true
           fs.create(finalSuccessPath, true).close()
         }`
   
   
   ### Why are the changes needed?
   Unavailability of this success marker file can be problematic for certain 
kind of bigdata pipelines which depend on the marker file for their workflow. 
Such pipelines are configured to use success marker as a token of completion of 
spark processing and to trigger downstream flows subsequently. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   The unit tests are added in the PR. Also, this has been running in our 
production images since 6 months+ without any issues/regression.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to