Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/19623#discussion_r148595625
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -50,28 +53,34 @@
/**
* Creates a writer factory which will be serialized and sent to
executors.
+ *
+ * If this method fails (by throwing an exception), the action would
fail and no Spark job was
+ * submitted.
*/
DataWriterFactory<Row> createWriterFactory();
/**
* Commits this writing job with a list of commit messages. The commit
messages are collected from
- * successful data writers and are produced by {@link
DataWriter#commit()}. If this method
- * fails(throw exception), this writing job is considered to be failed,
and
- * {@link #abort(WriterCommitMessage[])} will be called. The written
data should only be visible
- * to data source readers if this method succeeds.
+ * successful data writers and are produced by {@link
DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort(WriterCommitMessage[])} would be called.
The state of the destination
+ * is undefined and @{@link #abort(WriterCommitMessage[])} may not be
able to deal with it.
*
* Note that, one partition may have multiple committed data writers
because of speculative tasks.
* Spark will pick the first successful one and get its commit message.
Implementations should be
--- End diff --
> The only way to guarantee no more than one task can commit is if the
underlying storage system guarantees that. There is no way to design something
generic. It is simply not possible in a distributed system, when network
partitioning or message lost.
I agree that you cannot guarantee that >1 task has executed.
I believe that you can stop >1 task committing by having the scheduler
decide the exact task to commit & fail the job if it doesn't return within a
bounded time saying "done". At which point the final state of the destination
is unknown & you are into whatever driver-side algorithms chooses to implement
there, possibly based on information provided by the committer about its
ability to recover/retry. That's where it gets complicated, and where I'd need
to do a lot more delving into the spark source with a debugger and some fault
injecting committer underneath to really stress things.
Network partitioning
1. executor split from driver during execution & does not rejoin: can't
request commit, or requests commit & doesn't get a response. Timeout -> failure
on executor. Task abort? Driver must eventually consider task failed too.
Driver presumably has to consider executor timeout & retry. if speculation
enabled & first committer requests first, either let it through or tell it to
abort. If !speculation & first committer requests. must abort.
1. executor split from driver after asking for commit & before getting
response: same.
1. executor request to commit -> driver, rejection sent & not received.
timeout -> failure. task should abort, driver: should it just assume aborted?
1. executor request to commit -> driver, acceptance sent & not received;
task times out & doesn't commit, aborts. But what does driver do?
1. executor request to commit -> driver, acceptance sent, received,
processed & the response lost. Executor knows the difference: it's finished.
Driver cannot differentiate this from the previous one.
1. executor request to commit -> driver, acceptance sent, executor fails
before execution. No task abort executed on executor. No direct way for driver
to tell. How to interpret? Retry vis job fail?
1. executor request to commit -> driver, acceptance sent, driver fails
during commit. Dest state unknown. Unless protocol says it can handle recovery
here, -> abort.
1. driver itself fails; job restarted. Maybe: recovery attempted. Hadoop MR
v1 FileOutputCommitAlgorithm can do this as it can infer which tasks committed
by state of destFS, relying on all state being in consistent FS, task commit
atomic. Does make for a slower job commit though...main benefit is that for
jobs big & long enough that the probability of AM failure is tangible, it
doesn't require a replay of all committed tasks.
For the cases where driver is tells executor to commit & never returns ,
driver has a problem. It can't directly differentiate: (not committed,
committed, nonatomic commit-failed-partway)
Looking into the Hadoop code, `OutputCommiter.recoverTask()` comes out to
play here, which is run from the AM on a rebuilt task context. If it can
recover: Fine. If not, task is considered dead and needs cleanup (abort?).
To recover:
* dest exists ==> old state == committed => no-op + success
* dest !exists, repeat commit through rename(). If that rename [on the AM]
fails: retry some more times then abort task & fail job. Lack of meaningful
error text in rename hurts here. Because this recovery is done on the AM then
there's no partitioning going to happen between scheduler & commitTask() call,
just with remote FS/store/DB
Spark does not use the recovery methods of OutputCommitter, AFAIK.
The other way to handle failed/timed-out task commit is: give up, abort the
entire job, say "dest is in unknown state now". My tests imply that a task
commit failure in spark fails the job; I don't know about what happens if the
task commit outcome isn't received by the driver & will have to think of a test
there. Maybe I'll add another failure mode to the S3A fault injector: "block",
& see what happens.
Returning to the point about speculation; its the same as rerunning a task
which failed to report in: they both generate output, exactly one must be
committed, and the generated output should not (must not?) be manifest at the
dest paths until task commit. That was the issue with the
DirectOutputCommitter, wasn't it: you couldn't repeat a task whose executor
failed as had already modified the output directory.
For the spec,I think you should mandate that output MUST NOT be visible
until taskCommit; taskAbort MUST be repeatable & best effort; same for
jobAbort. I don't know about saying "Can recreate state of a task attempt and
be able to call taskAbort() again. I know Hadoop output committer requires it,
but I already know of one committer which will leak file:/tmp data if rerun on
a different host, because it can't get at the remote FS.
Now, should output be visible on taskCommit? It is on Hadoop v2 commit
protocol, which moves in all output to outputPath on task commit, downgrading
jobCommit to `touchz("$dest/_SUCCESS")`. It's faster, it just means that if job
fails before commit, you need to clean up the job by deleting dest/*.
The S3A committer delays all manifestation of MPUs until job commit because
we don't want anything to be observable, and cost of commit is a small POST XML
doc per file, which can be done in parallel. taskCommits are not repeatable,
and at least currently, any partition/timeout of taskCommit() is going to mean
the entire job needs rerunning. If spark does have the ability to rerun tasks
which didn't return from taskcommit, we could do something here, relying on the
fact tha we PUT the .pendingset of a tasks' commit metadata is atomically:
cleanup would just mean doing a HEAD of it. Exists -> task was committed.
!exists, retry (& rely on job cleanup() to cancel all pending MPUs under the
dest path being called in jobCommit()).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]