Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19623#discussion_r148643598
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
    @@ -50,28 +53,34 @@
     
       /**
        * Creates a writer factory which will be serialized and sent to 
executors.
    +   *
    +   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
    +   * submitted.
        */
       DataWriterFactory<Row> createWriterFactory();
     
       /**
        * Commits this writing job with a list of commit messages. The commit 
messages are collected from
    -   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
    -   * fails(throw exception), this writing job is considered to be failed, 
and
    -   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
    -   * to data source readers if this method succeeds.
    +   * successful data writers and are produced by {@link 
DataWriter#commit()}.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
    +   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
        *
        * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
        * Spark will pick the first successful one and get its commit message. 
Implementations should be
    --- End diff --
    
    > Your algorithm doesn't work
    
    to be clear: I know you cant reliably guarantee exactly 1 commit of a task 
in all conditions, but as long as speculative tasks don't manifest their work 
in the dest directory until commitTask is called, then you can have a stateful 
driver call taskCommit exactly once for a task attempt. It just can't be 
confident that anything received that request.
    The driver needs to as long as it reacts to a failure to return success by 
failing the job and saying "something went wrong" & letting layers above decide 
how to react. The output should have the results of 0-1 commits of that task, 
plus whatever other tasks had already committed. 
    
    Of course, the layers above have to somehow be connected to the driver, 
otherwise they won't be able to distinguish "job failed" from "job completed", 
hence the fact that the `_SUCCESS` manifest is considered the marker of a 
successful job. (which of course means that you need a consistent store so that 
HEAD of _SUCCESS doesn't return an old copy, so a simple probe isn't actually 
sufficient for S3)
    
    Oh, and there are a couple of failure modes I forgot to mention: executors 
doing their work after you think they've stopped. e.g
    
    * task attempt 1 commitTask called -> timeout, task attempt 2 commitTask 
called -< success, then task1 actually doing its commit. 
    * task 1 invoked commitTask -> timeout, job reacts to failure by aborting 
job, something retrying etc, *then task 1 actually doing the commit*. That is: 
even after the entire job has aborted and a new job executed, it is possible 
for a partitioned task for the first attempt to commit its work.
    
    That's an interesting one to defend against. 
    
    I do know that the MapReduce AM for retryable jobs tries to minimise this 
risk by only attempting to commit work if the last time it interacted with the 
YARN was within
    `yarn.app.mapreduce.am.job.committer.commit-window` millis. So a 
partitioned AM is guaranteed not to attempt to commit its work after that 
window. Task commit is similar, using the timeout for the communications as the 
window
    
    ```scala
    requestTaskCommit()
    val canCommit = awaitTaskCommit(timeout = jobcommitWindow)
    if (canCommit ) doCommit() else doAbort()
    ```
    Instead they just rely on the assumption that once the worker/manager 
protocol says "Commit" then you can go ahead, that is: the interval between 
retrieving a success from the manager and executing taskCommit() is negligible.
    
    Of course, timeout logic assumes time goes forward at the same rate 
everywhere, which in a world of VMs isn't something you can consistently see: 
either the JVM or the entire VM can block between awaitTaskCommt() returning 
true and doCommit() being called.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to