steveloughran commented on code in PR #29000:
URL: https://github.com/apache/spark/pull/29000#discussion_r1053641115


##########
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##########
@@ -41,13 +41,28 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
  * @param jobId the job's or stage's id
  * @param path the job's output path, or null if committer acts as a noop
  * @param dynamicPartitionOverwrite If true, Spark will overwrite partition 
directories at runtime
- *                                  dynamically, i.e., we first write files 
under a staging
- *                                  directory with partition path, e.g.
- *                                  /path/to/staging/a=1/b=1/xxx.parquet. When 
committing the job,
- *                                  we first clean up the corresponding 
partition directories at
- *                                  destination path, e.g. 
/path/to/destination/a=1/b=1, and move
- *                                  files from staging directory to the 
corresponding partition
- *                                  directories under destination path.
+ *                                  dynamically. Suppose final path is 
/path/to/outputPath, output
+ *                                  path of [[FileOutputCommitter]] is an 
intermediate path, e.g.
+ *                                  
/path/to/outputPath/.spark-staging-{jobId}, which is a staging
+ *                                  directory. Task attempts firstly write 
files under the
+ *                                  intermediate path, e.g.
+ *                                  
/path/to/outputPath/.spark-staging-{jobId}/_temporary/
+ *                                  
{appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet.
+ *
+ *                                  1. When [[FileOutputCommitter]] algorithm 
version set to 1,
+ *                                  we firstly move task attempt output files 
to
+ *                                  
/path/to/outputPath/.spark-staging-{jobId}/_temporary/
+ *                                  {appAttemptId}/{taskId}/a=1/b=1,
+ *                                  then move them to
+ *                                  
/path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
+ *                                  2. When [[FileOutputCommitter]] algorithm 
version set to 2,

Review Comment:
   v2 isn't safe in the presence of failures during task commit; at least here 
if the entire job fails then, provided job ids are unique, the output doesn't 
become visible. it is essentially a second attempt at the v1 rename algorithm 
with (hopefully) smaller output datasets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to