Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/20490#discussion_r166447570
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(),
context.attemptNumber())
+ val stageId = context.stageId()
+ val partId = context.partitionId()
+ val attemptId = context.attemptNumber()
+ val dataWriter = writeTask.createDataWriter(partId, attemptId)
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
iter.foreach(dataWriter.write)
- logInfo(s"Writer for partition ${context.partitionId()} is
committing.")
- val msg = dataWriter.commit()
- logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+ val msg = if (writeTask.useCommitCoordinator) {
+ val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --
bq. We never guarantee that for an RDD partition, only one task can commit
successfully
There's at-least once though, right? And then the Job Commit (which is
implicitly at-most-once) is expected to handle the situation wherein 1+ task
may have committed, and should resolve it so that the output of only one task
is added.
One thing which I think would be good is for the spark docs to somewhere
(scaladoc? markdown) to precisely write down its requirements of a committer.
For the WiP paper on the new S3A committers, [I've tried to do this across MR &
Spark](https://github.com/steveloughran/zero-rename-committer/blob/master/tex/a_zero_rename_committer.tex#L1993)
1. Complete: you get the output of all committed tasks
2. Exclusive: you only get the output of committed tasks
3. (Consistent: produces right output even if store is inconsistent)
4. Concurrent: >1 task may commit simultaneously
5. Abortable: if you abort a task, no output is visible
6. Continuity of correctness: after a job is committed, no partitioned
task may suddenly add its work to the output.
Not required: if there's a partition and a 2nd task attempt is committed,
the output of either one of those attempts must be committed, but the specifics
of which one is left open.
* Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3
* The Direct Parquet committer fails to meet requirements (2, 5 & probably
6)
* The Hadoop MR v2 committer fails on 2, because if a task attempt commit
fails partway through, some of its output may be in the dest dir. Both Spark
and MR assume that this situation never occurs. Really, committers should be
able to say "Doesn't support retry on task commit failure", or better.
Regarding this patch,
1. how often do you actually expect people to be doing their own commit
co-ordinator?
1. What's the likelihood that they will get it right?
As we can see, the number of people who can correctly implement a committer
is << than those who have shipped one; I don't see a commit coordinator being
any different. It's good to offer the flexibility, but important to have the
default being the one which everyone else uses and which is generally trusted.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]