Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19287#discussion_r141784747
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -744,6 +744,100 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         assert(resubmittedTasks === 0)
       }
     
    +
    +  test("[SPARK-22074] Task killed by other attempt task should not be 
resubmitted") {
    +    val conf = new SparkConf().set("spark.speculation", "true")
    +    sc = new SparkContext("local", "test", conf)
    +    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
    +    sc.conf.set("spark.speculation.multiplier", "0.0")
    +    sc.conf.set("spark.speculation.quantile", "0.5")
    +    sc.conf.set("spark.speculation", "true")
    +
    +    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
    +      ("exec2", "host2"), ("exec3", "host3"))
    +    sched.initialize(new FakeSchedulerBackend() {
    +      override def killTask(
    +       taskId: Long,
    +       executorId: String,
    +       interruptThread: Boolean,
    +       reason: String): Unit = {}
    +    })
    +
    +    // Keep track of the number of tasks that are resubmitted,
    +    // so that the test can check that no tasks were resubmitted.
    +    var resubmittedTasks = 0
    +    val dagScheduler = new FakeDAGScheduler(sc, sched) {
    +      override def taskEnded(
    +          task: Task[_],
    +          reason: TaskEndReason,
    +          result: Any,
    +          accumUpdates: Seq[AccumulatorV2[_, _]],
    +          taskInfo: TaskInfo): Unit = {
    +        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
    +        reason match {
    +          case Resubmitted => resubmittedTasks += 1
    +          case _ =>
    +        }
    +      }
    +    }
    +    sched.setDAGScheduler(dagScheduler)
    +
    +    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host1", "exec1")),
    +      Seq(TaskLocation("host3", "exec3")),
    +      Seq(TaskLocation("host2", "exec2")))
    +
    +    val clock = new ManualClock()
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
    +      task.metrics.internalAccums
    +    }
    +    // Offer resources for 4 tasks to start
    +    for ((k, v) <- List(
    +      "exec1" -> "host1",
    +      "exec1" -> "host1",
    +      "exec3" -> "host3",
    +      "exec2" -> "host2")) {
    +      val taskOption = manager.resourceOffer(k, v, NO_PREF)
    +      assert(taskOption.isDefined)
    +      val task = taskOption.get
    +      assert(task.executorId === k)
    +    }
    +    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
    +    clock.advance(1)
    +    // Complete the 2 tasks and leave 2 task in running
    +    for (id <- Set(0, 1)) {
    +      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
    +      assert(sched.endedTasks(id) === Success)
    +    }
    +
    +    // checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
    +    // speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
    +    // > 0ms, so advance the clock by 1ms here.
    +    clock.advance(1)
    +    assert(manager.checkSpeculatableTasks(0))
    +    assert(sched.speculativeTasks.toSet === Set(2, 3))
    +
    +    // Offer resource to start the speculative attempt for the running 
task 2.0
    +    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
    +    assert(taskOption.isDefined)
    +    val task4 = taskOption.get
    +    assert(task4.index === 2)
    +    assert(task4.taskId === 4)
    +    assert(task4.executorId === "exec2")
    +    assert(task4.attemptNumber === 1)
    +    sched.backend = mock(classOf[SchedulerBackend])
    --- End diff --
    
    Yep, in this case there's only one `killTask` check, fixed this in next 
patch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to