Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16892#discussion_r102821009
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
         assertDataStructuresEmpty()
       }
     
    -  test("run trivial shuffle with out-of-band failure and retry") {
    +  /**
    +   * In this test, we run a map stage where one of the executors fails but 
we still receive a
    +   * "zombie" complete message from a task that ran on that executor. We 
want to make sure the
    +   * stage is resubmitted so that the task that ran on the failed executor 
is re-executed, and
    +   * that the stage is only marked as finished once that task completes.
    +   */
    +  test("run trivial shuffle with out-of-band executor failure and retry") {
         val shuffleMapRdd = new MyRDD(sc, 2, Nil)
         val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
         val shuffleId = shuffleDep.shuffleId
         val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
         submit(reduceRdd, Array(0))
    -    // blockManagerMaster.removeExecutor("exec-hostA")
    -    // pretend we were told hostA went away
    +    // Tell the DAGScheduler that hostA was lost.
         runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
    -    // DAGScheduler will immediately resubmit the stage after it appears 
to have no pending tasks
    -    // rather than marking it is as failed and waiting.
         complete(taskSets(0), Seq(
           (Success, makeMapStatus("hostA", 1)),
           (Success, makeMapStatus("hostB", 1))))
    +
    +    // At this point, no more tasks are running for the stage (and the 
TaskSetManager considers the
    +    // stage complete), but the tasks that ran on HostA need to be re-run, 
so the DAGScheduler
    +    // should re-submit the stage.
    +    assert(taskSets.size === 2)
    +
    +    // Make sure that the stage that was re-submitted was the 
ShuffleMapStage (not the reduce
    +    // stage, which shouldn't be run until all of the tasks in the 
ShuffleMapStage complete on
    +    // alive executors).
    +    assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
    --- End diff --
    
    Good idea done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to