Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19623#discussion_r148595625
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
    @@ -50,28 +53,34 @@
     
       /**
        * Creates a writer factory which will be serialized and sent to 
executors.
    +   *
    +   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
    +   * submitted.
        */
       DataWriterFactory<Row> createWriterFactory();
     
       /**
        * Commits this writing job with a list of commit messages. The commit 
messages are collected from
    -   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
    -   * fails(throw exception), this writing job is considered to be failed, 
and
    -   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
    -   * to data source readers if this method succeeds.
    +   * successful data writers and are produced by {@link 
DataWriter#commit()}.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
    +   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
        *
        * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
        * Spark will pick the first successful one and get its commit message. 
Implementations should be
    --- End diff --
    
    > The only way to guarantee no more than one task can commit is if the 
underlying storage system guarantees that. There is no way to design something 
generic. It is simply not possible in a distributed system, when network 
partitioning or message lost.
    
    I agree that you cannot guarantee that >1 task has executed. 
    
    I believe that you can stop >1 task committing by having the scheduler 
decide the exact task to commit & fail the job if it doesn't return within a 
bounded time saying "done". At which point the final state of the destination 
is unknown & you are into whatever driver-side algorithms chooses to implement 
there, possibly based on information provided by the committer about its 
ability to recover/retry. That's where it gets complicated, and where I'd need 
to do a lot more delving into the spark source with a debugger and some fault 
injecting committer underneath to really stress things.
    
    Network partitioning
    
    1.  executor split from driver during execution & does not rejoin: can't 
request commit, or requests commit & doesn't get a response. Timeout -> failure 
on executor. Task abort? Driver must eventually consider task failed too. 
Driver presumably has to consider executor timeout & retry. if speculation 
enabled & first committer requests first, either let it through or tell it to 
abort. If !speculation & first committer requests. must abort.
    1. executor split from driver after asking for commit & before getting 
response: same.
    1. executor request to commit -> driver, rejection sent & not received. 
timeout -> failure. task should abort, driver: should it just assume aborted?
    1. executor request to commit -> driver, acceptance sent & not received; 
task times out & doesn't commit, aborts. But what does driver do?
    1. executor request to commit -> driver, acceptance sent, received, 
processed & the response lost. Executor knows the difference: it's finished. 
Driver cannot differentiate this from the previous one.
    1. executor request to commit -> driver, acceptance sent, executor fails 
before execution. No task abort executed on executor. No direct way for driver 
to tell. How to interpret? Retry vis job fail?
    1. executor request to commit -> driver, acceptance sent, driver fails 
during commit. Dest state unknown. Unless protocol says it can handle recovery 
here, -> abort.
    1. driver itself fails; job restarted. Maybe: recovery attempted. Hadoop MR 
v1 FileOutputCommitAlgorithm can do this as it can infer which tasks committed 
by state of destFS, relying on all state being in consistent FS, task commit 
atomic. Does make for a slower job commit though...main benefit is that for 
jobs big & long enough that the probability of AM failure is tangible, it 
doesn't require a replay of all committed tasks.
    
    For the cases where driver is tells executor to commit & never returns , 
driver has a problem. It can't directly differentiate: (not committed, 
committed, nonatomic commit-failed-partway)
    
    Looking into the Hadoop code, `OutputCommiter.recoverTask()` comes out to 
play here, which is run from the AM on a rebuilt task context. If it can 
recover: Fine. If not, task is considered dead and needs cleanup (abort?). 
    
    To recover: 
    * dest exists ==> old state == committed => no-op + success
    * dest !exists, repeat commit through rename(). If that rename [on the AM] 
fails: retry some more times then abort task & fail job. Lack of meaningful 
error text in rename hurts here. Because this recovery is done on the AM then 
there's no partitioning going to happen between scheduler & commitTask() call, 
just with remote FS/store/DB
    
    Spark does not use the recovery methods of OutputCommitter, AFAIK.
    
    The other way to handle failed/timed-out task commit is: give up, abort the 
entire job, say "dest is in unknown state now". My tests imply that a task 
commit failure in spark fails the job; I don't know about what happens if the 
task commit outcome isn't received by the driver & will have to think of a test 
there. Maybe I'll add another failure mode to the S3A fault injector: "block", 
& see what happens.
    
    Returning to the point about speculation; its the same as rerunning a task 
which failed to report in: they both generate output, exactly one must be 
committed, and the generated output should not (must not?) be manifest at the 
dest paths until task commit. That was the issue with the 
DirectOutputCommitter, wasn't it: you couldn't repeat a task whose executor 
failed as had already modified the output directory.
    
    For the spec,I think you should mandate that output MUST NOT be visible 
until taskCommit; taskAbort MUST be repeatable & best effort; same for 
jobAbort. I don't know about saying "Can recreate state of a task attempt and 
be able to call taskAbort() again. I know Hadoop output committer requires it, 
but I already know of one committer which will leak file:/tmp data if rerun on 
a different host, because it can't get at the remote FS.
    
    Now, should output be visible on taskCommit? It is on Hadoop v2 commit 
protocol, which moves in all output to outputPath on task commit, downgrading 
jobCommit to `touchz("$dest/_SUCCESS")`. It's faster, it just means that if job 
fails before commit, you need to clean up the job by deleting dest/*.  
    
    The S3A committer delays all manifestation of MPUs until job commit because 
we don't want anything to be observable, and cost of commit is a small POST XML 
doc per file, which can be done in parallel. taskCommits are not repeatable, 
and at least currently, any partition/timeout of taskCommit() is going to mean 
the entire job needs rerunning. If spark does have the ability to rerun tasks 
which didn't return from taskcommit, we could do something here, relying on the 
fact tha we PUT the .pendingset of a tasks' commit metadata is atomically: 
cleanup would just mean doing a HEAD of it. Exists -> task was committed. 
!exists, retry (& rely on job cleanup() to cancel all pending MPUs under the 
dest path being called in jobCommit()). 
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to