Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16959#discussion_r103741578
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 ---
    @@ -195,6 +195,17 @@ class OutputCommitCoordinatorSuite extends 
SparkFunSuite with BeforeAndAfter {
         sc.runJob(rdd, 
OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
            0 until rdd.partitions.size)
       }
    +
    +  test("SPARK-19631: Do not allow failed attempts to be authorized for 
committing") {
    +    val stage: Int = 1
    +    val partition: Int = 1
    +    val failedAttempt: Int = 0
    +    outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
    +    outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber 
= failedAttempt,
    +      reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
    +    assert(!outputCommitCoordinator.canCommit(stage, partition, 
failedAttempt))
    --- End diff --
    
    ok, I finally get it.  I was thinking this change was doing something 
different.  Sorry it took me a while.
    
    That said, I realize there *is* another issue here.  I tried to update the 
test to confirm that no other task would commit once there was an executor 
failure, like so:
    
    ```scala
      test("SPARK-19631: Do not allow failed attempts to be authorized for 
committing") {
        val stage: Int = 1
        val partition: Int = 1
        val failedAttempt: Int = 0
        outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
        outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = 
failedAttempt,
          reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
        // if we get a request to commit after we learn the executor failed, we 
don't authorize
        // the task to commit, so another attempt can commit.
        assert(!outputCommitCoordinator.canCommit(stage, partition, 
failedAttempt))
        assert(outputCommitCoordinator.canCommit(stage, partition, 
failedAttempt + 1))
        // but if we get an executor failure *after* we authorize a task to 
commit, we never let
        // another task commit.  Unfortunately, we just don't know what the 
status is of the first task,
        // so we can't safely let any other task proceed.
        outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = 
failedAttempt + 1,
          reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
        assert(!outputCommitCoordinator.canCommit(stage, partition, 
failedAttempt + 2))
      }
    ```
    
    This test fails at the final assert, because the executor failure *does* 
[clear the authorized 
commiter](https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143).
  But this PR doesn't change that at all -- an equivalent check in master would 
also fail.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to