steveloughran commented on issue #25795: [SPARK-29037][Core] Spark gives 
duplicate result when an application was killed
URL: https://github.com/apache/spark/pull/25795#issuecomment-532155845
 
 
   The standard "commit by rename" FileOutputCommitters assume exclusive access 
to the output directory tree; if they find existing attempt data, it's assumed 
to be from a previous attempt of the same query, or from a different query. 
MapReduce Will try to use that previous attempt data, otherwise ignore it. At 
the end of the query everything under _temporary Will probably be deleted. 
That's an implicit concurrency filter. Spark has historically complicated life 
within attempt ID of 0.
   
   I'm not actually sure how the different commit algorithms would cope with 
concurrent output. It's implicit in the V2 algorithm that different tasks can 
commit simultaneously, but since that algorithm lacks the failure semantics 
spark expects (task commits are *not* atomic) I'm not going to recommend it. 
And the v1 algorithm probably assumes it has exclusive access... You'd have to 
review its job commit algorithm in more detail to be confident that things will 
not go horribly wrong (see 
https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_005/a_zero_rename_committer.pdf)
   
   The S3A committers probably could support concurrent access if every 
Application attempt had a unique ID, and they opted not to attempt cleanup by 
deleting the attempt directories and all in progress multipart uploads under a 
path. You'd better make sure that your bucket had an object life cycle to 
delete outstanding uploads, otherwise you get an extra large bill of the end of 
the month. But people should be doing that anyway.
   
   What we do have there is Ryan's Partioned Committer which is designed to 
perform in situ updates of an existing table by only worrying about conflicts 
in the final partition of the directory tree. You can choose a policy of what 
to do if that is partition exists: fail, append or replace: 
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java#L122
   
   Sparked dynamic partition code seems to depend on rename being fast, but as 
it's done on a per file basis, doesn't rely on atomic directory commit (I 
think; I've not looked at it in enough detail). Which means, it's not that good 
for S3 at all. And as the output committer doesn't know the final destination 
of the output, there's no obvious way to move to an object-store specific 
commit mechanism. I just steered clear of it. Now could more be done spark-side 
if filesystems actually made their store specific upload operations visible? 
Maybe -and the not yet ready multipart uploads API would be that mechanism. But 
I am going to be ruthless here: is effort Best spent on trying to tweak how we 
write to object stores, or on object-store-first Data structures such as 
Iceberg et al.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to