Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97417399
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala 
---
    @@ -648,4 +648,69 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active 
taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    +    val b = shuffle(2, a)
    +    val shuffleId = b.shuffleDeps.head.shuffleId
    +
    +    def runBackend(): Unit = {
    +      val (taskDescription, task) = backend.beginTask()
    +      task.stageId match {
    +        // ShuffleMapTask
    +        case 0 =>
    +          val stageAttempt = task.stageAttemptId
    +          val partitionId = task.partitionId
    +          (stageAttempt, partitionId) match {
    +            case (0, 0) =>
    +              val fetchFailed = FetchFailed(
    +                DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 
0, 0, "ignored")
    +              backend.taskFailed(taskDescription, fetchFailed)
    +            case (0, 1) =>
    +              // Wait until stage resubmission caused by FetchFailed is 
finished.
    +              
waitUntilConditionBecomeTrue(taskScheduler.runningTaskSets.size==2, 5000,
    +                "Wait until stage is resubmitted caused by fetch failed")
    +
    +              // Task(stageAttempt=0, partition=1) will be bogus, because 
both two
    +              // tasks(stageAttempt=0, partition=0, 1) run on hostA.
    +              // Pending partitions are (0, 1) after stage resubmission,
    +              // then change to be 0 after this bogus task.
    +              backend.taskSuccess(taskDescription, 
DAGSchedulerSuite.makeMapStatus("hostA", 2))
    +            case (1, 1) =>
    +              // Wait long enough until Success of task(stageAttempt=1 and 
partition=0)
    +              // is handled by DAGScheduler.
    +              Thread.sleep(5000)
    --- End diff --
    
    hmm, this is a nuisance.  I don't see any good way to get rid of this sleep 
... but now that I think about it, why can't you do this in 
`DAGSchedulerSuite`?  it seems like this can be entirely contained to the 
`DAGScheduler` and doesn't require tricky interactions with other parts of the 
scheduler.  (I'm sorry I pointed you in the wrong direction earlier -- I 
thought perhaps you had tried to copy the examples of `DAGSchedlerSuite` but 
there was some reason you couldn't.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to