Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16892#discussion_r102281985
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
         assertDataStructuresEmpty()
       }
     
    -  test("run trivial shuffle with out-of-band failure and retry") {
    +  /**
    +   * In this test, we run a map stage where one of the executors fails but 
we still receive a
    +   * "zombie" complete message from a task that ran on that executor. We 
want to make sure the
    +   * stage is resubmitted so that the task that ran on the failed executor 
is re-executed, and
    +   * that the stage is only marked as finished once that task completes.
    +   */
    +  test("run trivial shuffle with out-of-band executor failure and retry") {
         val shuffleMapRdd = new MyRDD(sc, 2, Nil)
         val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
         val shuffleId = shuffleDep.shuffleId
         val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
         submit(reduceRdd, Array(0))
    -    // blockManagerMaster.removeExecutor("exec-hostA")
    -    // pretend we were told hostA went away
    +    // Tell the DAGScheduler that hostA was lost.
         runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
    -    // DAGScheduler will immediately resubmit the stage after it appears 
to have no pending tasks
    -    // rather than marking it is as failed and waiting.
         complete(taskSets(0), Seq(
           (Success, makeMapStatus("hostA", 1)),
           (Success, makeMapStatus("hostB", 1))))
    +
    +    // At this point, no more tasks are running for the stage (and the 
TaskSetManager considers the
    +    // stage complete), but the tasks that ran on HostA need to be re-run, 
so the DAGScheduler
    +    // should re-submit the stage.
    +    assert(taskSets.size === 2)
    +
    +    // Make sure that the stage that was re-submitted was the 
ShuffleMapStage (not the reduce
    +    // stage, which shouldn't be run until all of the tasks in the 
ShuffleMapStage complete on
    +    // alive executors).
    +    assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
    --- End diff --
    
    do you think its worth adding
    
    ```scala
    assert(taskSets(1).tasks.size === 1)
    ```
    here, to make sure that only the one task is resubmitted, not both?  If it 
weren't true, the test would fail later on anyway, but it might be helpful to 
get a more meaningful earlier error msg.  Not necessary, up to you on whether 
its worth adding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to