Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/19623#discussion_r148542687
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -50,28 +53,34 @@
/**
* Creates a writer factory which will be serialized and sent to
executors.
+ *
+ * If this method fails (by throwing an exception), the action would
fail and no Spark job was
+ * submitted.
*/
DataWriterFactory<Row> createWriterFactory();
/**
* Commits this writing job with a list of commit messages. The commit
messages are collected from
- * successful data writers and are produced by {@link
DataWriter#commit()}. If this method
- * fails(throw exception), this writing job is considered to be failed,
and
- * {@link #abort(WriterCommitMessage[])} will be called. The written
data should only be visible
- * to data source readers if this method succeeds.
+ * successful data writers and are produced by {@link
DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort(WriterCommitMessage[])} would be called.
The state of the destination
+ * is undefined and @{@link #abort(WriterCommitMessage[])} may not be
able to deal with it.
*
* Note that, one partition may have multiple committed data writers
because of speculative tasks.
* Spark will pick the first successful one and get its commit message.
Implementations should be
--- End diff --
OK, now I'm confused to the extent that I've been using the IDE to trace
method calls, and ideally would like to see any TLA+/PlusCan specs of what's
going on. Failing that, some .py pseudocode.
Otherwise, there's setting breakpoints and stepping through the debugger,
which what I ended up doing to work out WTF MapReduce did. [Committer
Architecture](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md)
is essentially the documentation of that debugger-derived analysis.
### sequence of
1. 1+ task created, scheduled,
1. setupTask(taskContext) called in workers for task before execution
1. task does its work. worker knows task is now ready to be committed
1. 1+ task declares ready to commit to driver
1. driver selects which of these tasks to move to commit
1. driver sends ok-to-commit(task) message request to executor with task.
1. Task committed via some `commitTask()` operation
1. speculative tasks told to abort or left in pending state to see what
happens
1. task commit succeeds: job continues
1. task commit fail: best effort attempt abort failed task (Hadoop MR may
do this in AM by recreating taskContext of failed task & calling
`taskAbort(tCtx)`) on committer created for job. means that if cleanup wants to
delete local file:// data, it may not get deleted)
1. if engine supports recovery from failed commits and commit algorithm
does: select another of pending speculative tasks or retry entire task, move to
step (2) unless #of attempts exceeded.
1. else: abort entire job
A requirement of speculation is "speculative work which is only observable
after commit". Its why the direct committers had to run with speculation off,
and even there failure handling could only be strictly handled by failing
entire job.
This is not a full 2PC protocol where the entire job is committed iff all
workers are ready to commit. Instead you are allowing >1 task to run in
parallel & choose which to commit.
### if >1 speculated tasks can commit work without coordination, then
unless all tasks generate output with same filenames/paths *and the commit of
an individual task is atomic*, you are in danger
1. Task A and Task B both commit different files: result, dest contains > 1
copy of each file. Task abort() isn't going to help as it's abort of pre-commit
state, not rollback of output.
1. Task A and Task B commit different files, but in parallel and with
nonatomic commit of files, then you will end up with the output of both.
To avoid that you're going to need some for of coordination between tasks
s.t. only one actually commits. This can be done implicitly via a filesystem
with some atomic operations (rename(src, dest), create(file,
overwrite=false)...) or some other mechanism.
Except, what about
[SPARK-4879](https://issues.apache.org/jira/browse/SPARK-4879) and
`OutputCommitCoordinator`. That is asking for permission to commit, and it is,
AFAIK, turned on by default from the property
Looking at
`org.apache.spark.sql.execution.datasources.FileFormatWriter.write()`
```scala
/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data
source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each
of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise
aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job;
If any exception is
* thrown during job commitment, also aborts the job.
* 5. If the job is successfully committed, perform post-commit operations
such as
* processing statistics.
* @return The set of all partition paths that were updated during this
write job.
*/
```
It's `FileFormatWriter.executeTask`,
1. does the etupTask/exec/commitTask in sequence,
1. calls `FileCommitProtocol.commitTask`
1. `HadoopMapReduceCommitProtocol.commitTask` calls
`SparkHadoopMapRedUtil.commitTask`
1. which, if there's data (`OutputCommitter.needsTaskCommit`) and
co-ordination enabled `"spark.hadoop.outputCommitCoordination.enabled" ==
true`, calls `OutputCommitCoordinator.canCommit(stage, part, attempt)`, which
then talks to the driver to get permission to commit. The driver-side
`OutputCommitCoordinator` instance gets to manage the state machine of task
attempts, which is done in `handleAskPermissionToCommit`
So, unless I've misunderstood the flow of things. Spark guarantees at most
one speculative task attempt for a (stage, part) succeeds, it's done through
coordination with the driver inside `FileFormatWriter.executeTask`.
If have understood, the key thing is: co-ordination is done in Spark's
OutputCommitCoordinator, with that coordinator being called from
`FileFormatWriter`, hence the routines to insert into HadoopFS or Hive.
For this API here, where is that coordination going to take place?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]