Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/19623#discussion_r148888237
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
---
@@ -50,28 +53,34 @@
/**
* Creates a writer factory which will be serialized and sent to
executors.
+ *
+ * If this method fails (by throwing an exception), the action would
fail and no Spark job was
+ * submitted.
*/
DataWriterFactory<Row> createWriterFactory();
/**
* Commits this writing job with a list of commit messages. The commit
messages are collected from
- * successful data writers and are produced by {@link
DataWriter#commit()}. If this method
- * fails(throw exception), this writing job is considered to be failed,
and
- * {@link #abort(WriterCommitMessage[])} will be called. The written
data should only be visible
- * to data source readers if this method succeeds.
+ * successful data writers and are produced by {@link
DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to to have been
+ * failed, and {@link #abort(WriterCommitMessage[])} would be called.
The state of the destination
+ * is undefined and @{@link #abort(WriterCommitMessage[])} may not be
able to deal with it.
*
* Note that, one partition may have multiple committed data writers
because of speculative tasks.
* Spark will pick the first successful one and get its commit message.
Implementations should be
--- End diff --
thinking about this some more (and still reserving the right to be provably
wrong), I could imagine a job commit protocol which allowed 1+ Task attempt to
commit, making the decision about which completed tasks to accept into the
final results at job commit.
That is,
1. every task attempt could (atomically) promote its output to the job
commit dir
1. the job commit would enumerate all promoted task attempts, and choose
which ones
to accept.
1. The ones it didn't want would be aborted by the job committer.
Example: task attempts rename their output to a dir
`dest/_tempt/jobId/completed/$taskId_$task_attemptId`; job committer would enum
all directories in the completed dir, and for all dirs where the task ID was
the same: pick one to commit, abort the others.
With that variant, you don't need to coordinate task commit across workers,
even with speculation enabled. You'd just need to be confident that the
promotion of task commit information was atomic, the view of the output
consistent, and that job commit is not initiated until at least one attempt per
task has succeeded. Job commit is potentially slower though.
Because you can turn off use of the output co-ordinator when writing to
hive/hadoop tables, then a commit protocol like this could work today.
Interestingly, it's not that far off the FileOutputCommitter v1 protocol, you'd
just renane the task attempt output dir to the promoted dir & let the job
commit filter out duplicates from its directory listing. Be a bit more
expensive in terms of storage use between task commit and job commit though.
Maybe a good policy here is "a job must not commit 1+ task attempt", but
give committers the option to bypass the output coordinator. if the job
committer can handle the conflict resolution & so make that guarantee in its
commit phase.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]