Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166447570
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    bq. We never guarantee that for an RDD partition, only one task can commit 
successfully
    
    There's at-least once though, right? And then the Job Commit (which is 
implicitly at-most-once) is expected to handle the situation wherein 1+ task 
may have committed, and should resolve it so that the output of only one task 
is added.
    
    One thing which I think would be good is for the spark docs to somewhere 
(scaladoc? markdown) to precisely write down its requirements of a committer. 
For the WiP paper on the new S3A committers, [I've tried to do this across MR & 
Spark](https://github.com/steveloughran/zero-rename-committer/blob/master/tex/a_zero_rename_committer.tex#L1993)
    
    1. Complete: you get the output of all committed tasks
    2. Exclusive: you only get the output of committed tasks
    3. (Consistent: produces right output even if store is inconsistent)
    4. Concurrent: >1 task may commit simultaneously
    5. Abortable: if you abort a task, no output is visible
    6. Continuity of correctness: after a job is committed,  no partitioned 
task may suddenly add its work to the output.
    
    Not required: if there's a partition and a 2nd task attempt is committed, 
the output of either one of those attempts must be committed, but the specifics 
of which one is left open.
    
    * Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3
    * The Direct Parquet committer fails to meet requirements (2, 5 & probably 
6)
    * The Hadoop MR v2 committer fails on 2, because if a task attempt commit 
fails partway through, some of its output may be in the dest dir. Both Spark 
and MR assume that this situation never occurs. Really, committers should be 
able to say "Doesn't support retry on task commit failure", or better. 
    
    Regarding this patch,
    
    1. how often do you actually expect people to be doing their own commit 
co-ordinator? 
    1. What's the likelihood that they will get it right?
    
    As we can see, the number of people who can correctly implement a committer 
is << than those who have shipped one; I don't see a commit coordinator being 
any different. It's good to offer the flexibility, but important to have the 
default being the one which everyone else uses and which is generally trusted.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to