Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/19623#discussion_r148643598
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -50,28 +53,34 @@
/**
* Creates a writer factory which will be serialized and sent to
executors.
+ *
+ * If this method fails (by throwing an exception), the action would
fail and no Spark job was
+ * submitted.
*/
DataWriterFactory<Row> createWriterFactory();
/**
* Commits this writing job with a list of commit messages. The commit
messages are collected from
- * successful data writers and are produced by {@link
DataWriter#commit()}. If this method
- * fails(throw exception), this writing job is considered to be failed,
and
- * {@link #abort(WriterCommitMessage[])} will be called. The written
data should only be visible
- * to data source readers if this method succeeds.
+ * successful data writers and are produced by {@link
DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort(WriterCommitMessage[])} would be called.
The state of the destination
+ * is undefined and @{@link #abort(WriterCommitMessage[])} may not be
able to deal with it.
*
* Note that, one partition may have multiple committed data writers
because of speculative tasks.
* Spark will pick the first successful one and get its commit message.
Implementations should be
--- End diff --
> Your algorithm doesn't work
to be clear: I know you cant reliably guarantee exactly 1 commit of a task
in all conditions, but as long as speculative tasks don't manifest their work
in the dest directory until commitTask is called, then you can have a stateful
driver call taskCommit exactly once for a task attempt. It just can't be
confident that anything received that request.
The driver needs to as long as it reacts to a failure to return success by
failing the job and saying "something went wrong" & letting layers above decide
how to react. The output should have the results of 0-1 commits of that task,
plus whatever other tasks had already committed.
Of course, the layers above have to somehow be connected to the driver,
otherwise they won't be able to distinguish "job failed" from "job completed",
hence the fact that the `_SUCCESS` manifest is considered the marker of a
successful job. (which of course means that you need a consistent store so that
HEAD of _SUCCESS doesn't return an old copy, so a simple probe isn't actually
sufficient for S3)
Oh, and there are a couple of failure modes I forgot to mention: executors
doing their work after you think they've stopped. e.g
* task attempt 1 commitTask called -> timeout, task attempt 2 commitTask
called -< success, then task1 actually doing its commit.
* task 1 invoked commitTask -> timeout, job reacts to failure by aborting
job, something retrying etc, *then task 1 actually doing the commit*. That is:
even after the entire job has aborted and a new job executed, it is possible
for a partitioned task for the first attempt to commit its work.
That's an interesting one to defend against.
I do know that the MapReduce AM for retryable jobs tries to minimise this
risk by only attempting to commit work if the last time it interacted with the
YARN was within
`yarn.app.mapreduce.am.job.committer.commit-window` millis. So a
partitioned AM is guaranteed not to attempt to commit its work after that
window. Task commit is similar, using the timeout for the communications as the
window
```scala
requestTaskCommit()
val canCommit = awaitTaskCommit(timeout = jobcommitWindow)
if (canCommit ) doCommit() else doAbort()
```
Instead they just rely on the assumption that once the worker/manager
protocol says "Commit" then you can go ahead, that is: the interval between
retrieving a success from the manager and executing taskCommit() is negligible.
Of course, timeout logic assumes time goes forward at the same rate
everywhere, which in a world of VMs isn't something you can consistently see:
either the JVM or the entire VM can block between awaitTaskCommt() returning
true and doCommit() being called.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]