Ngone51 commented on a change in pull request #28619:
URL: https://github.com/apache/spark/pull/28619#discussion_r433964453



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt

Review comment:
       nit: redundant whitespace

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+    assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
+    assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
+    assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0s, so it can still finish before t=15s (Median task 
runtime = 10s)
+    // Task3 started at t=6s, so it might not finish before t=15s. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Nothing should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1*1000) // time = 11s
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5*1000) // time = 16s
+    // At t=16s, Task 2 has been running for 16s. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 
will
+    // be selected for speculation. Here we are verifying that regular 
speculation configs
+    // should still take effect even when a 
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that task index 2 is launched on exec3, host3

Review comment:
       Can we unify the description for the task? As far as I see, we have 
`TASK 2`, `TASK2`, `Task2`, `task index 2` in the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to