Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6750#discussion_r34921128
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -547,6 +554,139 @@ class DAGSchedulerSuite
         assert(sparkListener.failedStages.size == 1)
       }
     
    +  /**
    +   * This tests the case where another FetchFailed comes in while the map 
stage is getting
    +   * re-run.
    +   */
    +  test("late fetch failures don't cause multiple concurrent attempts for 
the same map stage") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    +    submit(reduceRdd, Array(0, 1))
    +
    +    val mapStageId = 0
    +    def countSubmittedMapStageAttempts(): Int = {
    +      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
    +    }
    +
    +    // The map stage should have been submitted.
    +    assert(countSubmittedMapStageAttempts() === 1)
    +
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +    // The MapOutputTracker should know about both map output locations.
    +    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) 
===
    +      Array("hostA", "hostB"))
    +    assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) 
===
    +      Array("hostA", "hostB"))
    +
    +    // The first result task fails, with a fetch failure for the output 
from the first mapper.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(sparkListener.failedStages.contains(1))
    +
    +    // Trigger resubmission of the failed map stage.
    +    runEvent(ResubmitFailedStages)
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +
    +    // Another attempt for the map stage should have been submitted, 
resulting in 2 total attempts.
    +    assert(countSubmittedMapStageAttempts() === 2)
    +
    +    // The second ResultTask fails, with a fetch failure for the output 
from the second mapper.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +
    +    // Another ResubmitFailedStages event should not result in another 
attempt for the map
    +    // stage being run concurrently.
    +    // NOTE: the actual ResubmitFailedStages may get called at any time 
during this, shouldn't
    +    // effect anything -- our calling it just makes *SURE* it gets called 
between the desired event
    +    // and our check.
    +    runEvent(ResubmitFailedStages)
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(countSubmittedMapStageAttempts() === 2)
    +
    +  }
    +
    +  /**
    +    * This tests the case where a late FetchFailed comes in after the map 
stage has finished getting
    +    * retried and a new reduce stage starts running.
    +    */
    +  test("extremely late fetch failures don't cause multiple concurrent 
attempts for " +
    +      "the same stage") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    +    submit(reduceRdd, Array(0, 1))
    +
    +    def countSubmittedReduceStageAttempts(): Int = {
    +      sparkListener.submittedStageInfos.count(_.stageId == 1)
    +    }
    +    def countSubmittedMapStageAttempts(): Int = {
    +      sparkListener.submittedStageInfos.count(_.stageId == 0)
    +    }
    +
    +    // The map stage should have been submitted.
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(countSubmittedMapStageAttempts() === 1)
    +
    +    // Complete the map stage.
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // The reduce stage should have been submitted.
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(countSubmittedReduceStageAttempts() === 1)
    +
    +    // The first result task fails, with a fetch failure for the output 
from the first mapper.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +
    +    // Trigger resubmission of the failed map stage and finish the 
re-started map task.
    +    runEvent(ResubmitFailedStages)
    +    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
    +
    +    // Because the map stage finished, another attempt for the reduce 
stage should have been
    +    // submitted, resulting in 2 total attempts for each the map and the 
reduce stage.
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(countSubmittedMapStageAttempts() === 2)
    +    assert(countSubmittedReduceStageAttempts() === 2)
    +
    +    // A late FetchFailed arrives from the second task in the original 
reduce stage.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +
    +    // Trigger resubmission of the failed map stage and finish the 
re-started map task.
    --- End diff --
    
    Oh weird... yeah your understand is correct!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to