Github user steveloughran commented on the issue:
https://github.com/apache/spark/pull/21286
> cc @steveloughran who I believe is the expert in this area.
I suppose "Stepped through the FileOutputCommit operations with a debugger
and a pen and paper" counts, given the complexity there. There's still a lot of
corner cases which I'm not 100% sure on (or confident that the expectations of
the job coordinators are met). Otherwise its more folklore "we did this because
of job A failed with error...", plus some experiments with fault injection. I'd
point to @rdblue as having put in this work too.
* Hadoop MR uses the jobID for unique temp paths, which comes from yarn and
guaranteed to be unique within the cluster, at least until everything is
restarted. See [Hadoop committer
architecture](http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html)
* And to handle job restart, has a temp ID too.
Using a temp dir and then renaming in is ~what the FileOutputCommitter v1
algorithm does
1. task commit:
`_temporary/$jobAttemptId/_temporary/$taskID_$taskAttemptID` ->
`_temporary/$jobAttemptId/$taskID`
2. Job commit: list `_temporary/$jobAttemptId`, move over. This is
sequential renaming, slow on very large jobs on HDFS &c, where it's O(files),
performance killer on any object store where it's O(data)
3. The "v2" algorithm avoids this job commit overhead by incrementally
committing tasks as they complete, so breaking fundamental assumptions about
observability of output and the ability to recover from failure of tasks and
jobs.
Adding an extra directory with another rename has some serious issues
* Completely breaks all the work Ryan and I have done with committers which
PUT directly into place in S3, where "place" can include specific partitions
with specific conflict resolution
* Adds *another* O(files) or O(data) rename process. So doubles the commit
time of V1, and for v2 restores the v1 commit overhead, while at least fixing
the task commit semantics. Essentially: it reinstates v1, just less efficiently.
* still has that problem of how to handle failure in object stores (s3,
GCS) which don't do atomic directory rename.
Which is why I think it's the wrong solution
Normally Spark rejects work to the destination if it's already there, so
only one job will have a temp dir. This conflict will only be an issue if
overwrite is allowed, which is going to have other adverse consequences if
files with the same name are ever created. If the two jobs commit
simultaneously, you'll get a mixture of results. This is partly why the S3A
committers insert UUIDs into their filenames by default, the other being S3's
lack of update consistency.
Ignoring that little issue, @cloud-fan is right: giving jobs a unique ID
should be enough to ensure that FileOutputCommitter does all it's work in
isolation.
Any ID known to be unique to all work actively potentially able to write to
the same dest dir. Hadoop MR has a strict ordering requirement so that it can
attempt to recover from job attempt failures (it looks for
_temporary/$job_id_($job-attempt-id--/ to find committed work from the previous
attempt). Spark should be able to just create a UUID.
@zheh12 : welcome to the world of distributed commit protocols. My writeup
is
[here](https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf).
Also check out Gil Vernik's [Stocator
Papper](https://arxiv.org/abs/1709.01812). Start with those and the source and
assume we've all made mistakes...
finally, regarding MAPREDUCE cleanup JIRAs, the most recent is
[MAPREDUCE-7029](https://issues.apache.org/jira/browse/MAPREDUCE-7029). That
includes comments from the google team on their store's behaviour.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]