Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19623#discussion_r148542687
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
    @@ -50,28 +53,34 @@
     
       /**
        * Creates a writer factory which will be serialized and sent to 
executors.
    +   *
    +   * If this method fails (by throwing an exception), the action would 
fail and no Spark job was
    +   * submitted.
        */
       DataWriterFactory<Row> createWriterFactory();
     
       /**
        * Commits this writing job with a list of commit messages. The commit 
messages are collected from
    -   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
    -   * fails(throw exception), this writing job is considered to be failed, 
and
    -   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
    -   * to data source readers if this method succeeds.
    +   * successful data writers and are produced by {@link 
DataWriter#commit()}.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
    +   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
        *
        * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
        * Spark will pick the first successful one and get its commit message. 
Implementations should be
    --- End diff --
    
    OK, now I'm confused to the extent that I've been using the IDE to trace 
method calls, and ideally would like to see any TLA+/PlusCan specs of what's 
going on. Failing that, some .py pseudocode. 
    
    Otherwise, there's setting breakpoints and stepping through the debugger, 
which what I ended up doing to work out WTF MapReduce did. [Committer 
Architecture](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md)
 is essentially the documentation of that debugger-derived analysis.
    
    ### sequence of 
    1. 1+ task created, scheduled, 
    1. setupTask(taskContext) called in workers for task before execution
    1. task does its work. worker knows task is now ready to be committed
    1. 1+ task declares ready to commit to driver 
    1. driver selects which of these tasks to move to commit
    1. driver sends ok-to-commit(task) message request to executor with task.
    1. Task committed via some `commitTask()` operation
    1. speculative tasks told to abort or left in pending state to see what 
happens
    1. task commit succeeds: job continues
    1. task commit fail: best effort attempt abort failed task (Hadoop MR may 
do this in AM by recreating taskContext of failed task & calling 
`taskAbort(tCtx)`) on committer created for job. means that if cleanup wants to 
delete local file:// data, it may not get deleted)
    1. if engine supports recovery from failed commits and commit algorithm 
does: select another of pending speculative tasks or retry entire task, move to 
step (2) unless #of attempts exceeded.
    1. else: abort entire job
    
    A requirement of speculation is "speculative work which is only observable 
after commit". Its why the direct committers had to run with speculation off, 
and even there failure handling could only be strictly handled by failing 
entire job. 
    
    This is not a full 2PC protocol where the entire job is committed iff all 
workers are ready to commit. Instead you are allowing >1 task to run in 
parallel & choose which to commit.
    
    ### if >1 speculated tasks can commit work without coordination, then 
unless all tasks generate output with same filenames/paths *and the commit of 
an individual task is atomic*,  you are in danger
    
    1. Task A and Task B both commit different files: result, dest contains > 1 
copy of each file. Task abort() isn't going to help as it's abort of pre-commit 
state, not rollback of output.
    1. Task A and Task B commit different files, but in parallel and with 
nonatomic commit of files, then you will end up with the output of both.
    
    To avoid that you're going to need some for of coordination between tasks 
s.t. only one actually commits. This can be done implicitly via a filesystem 
with some atomic operations (rename(src, dest), create(file, 
overwrite=false)...) or some other mechanism.
    
    Except, what about 
[SPARK-4879](https://issues.apache.org/jira/browse/SPARK-4879) and 
`OutputCommitCoordinator`. That is asking for permission to commit, and it is, 
AFAIK, turned on by default from the property 
    
    Looking at 
`org.apache.spark.sql.execution.datasources.FileFormatWriter.write()`
    
    ```scala
    /**
     * Basic work flow of this command is:
     * 1. Driver side setup, including output committer initialization and data 
source specific
     *    preparation work for the write job to be issued.
     * 2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
     *    rows within an RDD partition.
     * 3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
     *    exception is thrown during task commitment, also aborts that task.
     * 4. If all tasks are committed, commit the job, otherwise aborts the job; 
 If any exception is
     *    thrown during job commitment, also aborts the job.
     * 5. If the job is successfully committed, perform post-commit operations 
such as
     *    processing statistics.
     * @return The set of all partition paths that were updated during this 
write job.
     */
    ```
    
    It's `FileFormatWriter.executeTask`,
    1.  does the etupTask/exec/commitTask in sequence, 
    1. calls `FileCommitProtocol.commitTask`
    1. `HadoopMapReduceCommitProtocol.commitTask` calls 
`SparkHadoopMapRedUtil.commitTask`
    1. which, if there's data (`OutputCommitter.needsTaskCommit`) and 
co-ordination enabled `"spark.hadoop.outputCommitCoordination.enabled" == 
true`, calls `OutputCommitCoordinator.canCommit(stage, part, attempt)`, which 
then talks to the driver to get permission to commit. The driver-side 
`OutputCommitCoordinator` instance gets to manage the state machine of task 
attempts, which is done in `handleAskPermissionToCommit`
    
    So, unless I've misunderstood the flow of things. Spark guarantees at most 
one speculative task attempt for a (stage, part) succeeds, it's done through 
coordination with the driver inside `FileFormatWriter.executeTask`.
    
    If have understood, the key thing is: co-ordination is done in Spark's 
OutputCommitCoordinator, with that coordinator being called from 
`FileFormatWriter`, hence the routines to insert into HadoopFS or Hive. 
    
    For this API here, where is that coordination going to take place? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to