Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6750#discussion_r33707866
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -547,6 +554,136 @@ class DAGSchedulerSuite
         assert(sparkListener.failedStages.size == 1)
       }
     
    +  /** This tests the case where another FetchFailed comes in while the map 
stage is getting
    +    * re-run. */
    +  test("late fetch failures don't cause multiple concurrent attempts for 
the same map stage") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    +    submit(reduceRdd, Array(0, 1))
    +
    +    val mapStageId = 0
    +    def countSubmittedMapStageAttempts(): Int = {
    +      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
    +    }
    +
    +    // The map stage should have been submitted.
    +    assert(countSubmittedMapStageAttempts() === 1)
    +
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +    // The MapOutputTracker should know about both map output locations.
    +    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) 
===
    +      Array("hostA", "hostB"))
    +    assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) 
===
    +      Array("hostA", "hostB"))
    +
    +    // The first result task fails, with a fetch failure for the output 
from the first mapper.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    assert(sparkListener.failedStages.contains(1))
    +
    +    // Trigger resubmission of the failed map stage.
    +    runEvent(ResubmitFailedStages)
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +
    +    // Another attempt for the map stage should have been submitted, 
resulting in 2 total attempts.
    +    assert(countSubmittedMapStageAttempts() === 2)
    +
    +    // The second ResultTask fails, with a fetch failure for the output 
from the second mapper.
    +    runEvent(CompletionEvent(
    +      taskSets(1).tasks(1),
    +      FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
    +      null,
    +      Map[Long, Any](),
    +      createFakeTaskInfo(),
    +      null))
    +
    +    // Another ResubmitFailedStages event should not result result in 
another attempt for the map
    --- End diff --
    
    nit: result result


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to