Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4155#discussion_r23575011
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
         val taCtxt = getTaskContext()
         val cmtr = getOutputCommitter()
         if (cmtr.needsTaskCommit(taCtxt)) {
    -      try {
    -        cmtr.commitTask(taCtxt)
    -        logInfo (taID + ": Committed")
    -      } catch {
    -        case e: IOException => {
    -          logError("Error committing the output of task: " + taID.value, e)
    -          cmtr.abortTask(taCtxt)
    -          throw e
    +      val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
    +      val conf = SparkEnv.get.conf
    +      val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
    +      if (canCommit) {
    --- End diff --
    
    Ah, I missed this discussion since it got collapsed under a changed diff.
    
    I'll have to revisit the docs / usage to confirm this, but I think that the 
Hadoop `needsTaskCommit` was a necessary but not sufficient condition for 
committing where `needsTaskCommit` should always return `true` for a task that 
has not had its output committed (in fact, I think it would be valid for it to 
always return `true`, even for already-committed output).
    
    I agree that task 2 should throw an error, since it seems like doing 
otherwise would also lead to missing output: as soon as the scheduler sees one 
successful completion for a task, it won't re-run that task, so we need it to 
be the case that "successful task completion" implies "output committed."
    
    To run with your example, I think that task 2 would see `needsTaskCommit = 
false` because the output hasn't been committed yet, but there's still a 
problem that can lead to missing output.  Imagine that we had this interleaving:
    
    1: Tasks 1 and 2 start
    2. Task 1 asks committer.needsTaskCommit() which returns true
    3. Task 2 asks committer.needsTaskCommit(), which returns false.  It exits 
without throwing an exception and reports success.
    4. Task 1 fails to commit
    
    In this case, one copy of the task has reported success even though output 
was not committed, so that partition will be missing because we won't 
re-schedule an attempt to commit.
    
    So, I agree with @vanzin: if the DAGScheduler did not authorize the commit, 
then we should throw an exception.  I think that this exception will hopefully 
be rare in practice because needsTaskCommit should ideally return `false` for 
tasks that are definitely committed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to