steveloughran commented on issue #25795: [SPARK-29037][Core] Spark gives duplicate result when an application was killed URL: https://github.com/apache/spark/pull/25795#issuecomment-532155845 The standard "commit by rename" FileOutputCommitters assume exclusive access to the output directory tree; if they find existing attempt data, it's assumed to be from a previous attempt of the same query, or from a different query. MapReduce Will try to use that previous attempt data, otherwise ignore it. At the end of the query everything under _temporary Will probably be deleted. That's an implicit concurrency filter. Spark has historically complicated life within attempt ID of 0. I'm not actually sure how the different commit algorithms would cope with concurrent output. It's implicit in the V2 algorithm that different tasks can commit simultaneously, but since that algorithm lacks the failure semantics spark expects (task commits are *not* atomic) I'm not going to recommend it. And the v1 algorithm probably assumes it has exclusive access... You'd have to review its job commit algorithm in more detail to be confident that things will not go horribly wrong (see https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_005/a_zero_rename_committer.pdf) The S3A committers probably could support concurrent access if every Application attempt had a unique ID, and they opted not to attempt cleanup by deleting the attempt directories and all in progress multipart uploads under a path. You'd better make sure that your bucket had an object life cycle to delete outstanding uploads, otherwise you get an extra large bill of the end of the month. But people should be doing that anyway. What we do have there is Ryan's Partioned Committer which is designed to perform in situ updates of an existing table by only worrying about conflicts in the final partition of the directory tree. You can choose a policy of what to do if that is partition exists: fail, append or replace: https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java#L122 Sparked dynamic partition code seems to depend on rename being fast, but as it's done on a per file basis, doesn't rely on atomic directory commit (I think; I've not looked at it in enough detail). Which means, it's not that good for S3 at all. And as the output committer doesn't know the final destination of the output, there's no obvious way to move to an object-store specific commit mechanism. I just steered clear of it. Now could more be done spark-side if filesystems actually made their store specific upload operations visible? Maybe -and the not yet ready multipart uploads API would be that mechanism. But I am going to be ruthless here: is effort Best spent on trying to tweak how we write to object stores, or on object-store-first Data structures such as Iceberg et al.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
