Ngone51 commented on a change in pull request #28619:
URL: https://github.com/apache/spark/pull/28619#discussion_r430165260



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.

Review comment:
       nit: `Noting` -> `Nothing`

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1100,8 +1115,12 @@ private[spark] class TaskSetManager(
 
   def executorDecommission(execId: String): Unit = {
     recomputeLocality()
-    // Future consideration: if an executor is decommissioned it may make 
sense to add the current
-    // tasks to the spec exec queue.
+    if (executorDecommissionKillInterval.nonEmpty) {
+      val executorKillTime = clock.getTimeMillis() + 
executorDecommissionKillInterval.get
+      runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid 
=>
+        tidToExecutorKillTimeMapping(tid) = executorKillTime
+      }
+    }

Review comment:
       nit:
   
   ```suggestion
       executorDecommissionKillInterval.foreach { interval =>
         val executorKillTime = clock.getTimeMillis() + interval
         taskInfos.filter(info.executorId == execId && info.running).foreach { 
info =>
           tidToExecutorKillTimeMapping(into.taskId) = executorKillTime
         }
       }
   ```

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1842,6 +1842,17 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.killInterval")
+      .doc("Duration after which a decommissioned executor will be killed 
forcefully." +
+        "This config is useful for cloud environments where we know in advance 
when " +
+        "an executor is going to go down after decommissioning signal Ex- 
around 2 mins " +
+        "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is 
currently " +
+        "used to decide what tasks running on decommission executors to 
speculate")
+      .version("3.1.0")
+      .timeConf(TimeUnit.MILLISECONDS)

Review comment:
       is it too small since your given examples are at least using minute?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1) // time = 11 ms
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5) // time = 16 ms
+    // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 
4 will
+    // be selected for speculation. Here we are verifying that regular 
speculation configs
+    // should still take effect even when a 
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that task index 2 is launched on exec3, host3
+    assert(speculativeTask2.index === 2)
+    assert(speculativeTask2.taskId === 5)
+    assert(speculativeTask2.executorId === "exec3")
+    assert(speculativeTask2.attemptNumber === 1)
+
+    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(3) === 2)
+
+    // Offering additional resources should not lead to any speculative tasks 
being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+    assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)

Review comment:
       Do we need to apply resourceOffer for exec2 since it's decommissioning?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1) // time = 11 ms
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5) // time = 16 ms
+    // At t=16 ms, Task 4 has completed 16 ms. It is more than the

Review comment:
       BTW, it should be TASK 2 instead of TASK 4?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1) // time = 11 ms
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5) // time = 16 ms
+    // At t=16 ms, Task 4 has completed 16 ms. It is more than the

Review comment:
       > Task 4 has completed 16 ms
   
   Do you mean task 4 has been running for 16ms?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1) // time = 11 ms
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5) // time = 16 ms
+    // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 
4 will
+    // be selected for speculation. Here we are verifying that regular 
speculation configs
+    // should still take effect even when a 
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that task index 2 is launched on exec3, host3
+    assert(speculativeTask2.index === 2)
+    assert(speculativeTask2.taskId === 5)
+    assert(speculativeTask2.executorId === "exec3")
+    assert(speculativeTask2.attemptNumber === 1)
+
+    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(3) === 2)
+
+    // Offering additional resources should not lead to any speculative tasks 
being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+    assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
+    assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
+  }
+
+  test("SPARK-21040: Check speculative tasks are not launched when an 
executor" +

Review comment:
       I think this test maybe not necessary because the above test has already 
covered it:
   
   ```
       // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
       // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
       // of speculativeTasks
   ```
   
   ?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

Review comment:
       It seems `startedTasks` is updated asynchronically by 
`FakeDAGScheduler`. Maybe we could use `eventually` to pretend the potential 
failure.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")

Review comment:
       Could you assert `tidToExecutorKillTimeMapping` here?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+        if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+          // Check whether this task will finish before the exectorKillTime 
assuming
+          // it will take medianDuration overall. If this task cannot finish 
within
+          // executorKillInterval, then this task is a candidate for 
speculation
+          val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
+          val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+            taskEndTimeBasedOnMedianDuration
+          if (canExceedDeadline) {
+            speculated = checkAndSubmitSpeculatableTask(tid, time, 0)

Review comment:
       We need to remove the corresponding task from 
`tidToExecutorKillTimeMapping` if `speculated` is true?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))

Review comment:
       I see. you're right.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+        if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+          // Check whether this task will finish before the exectorKillTime 
assuming
+          // it will take medianDuration overall. If this task cannot finish 
within
+          // executorKillInterval, then this task is a candidate for 
speculation
+          val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
+          val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+            taskEndTimeBasedOnMedianDuration
+          if (canExceedDeadline) {
+            speculated = checkAndSubmitSpeculatableTask(tid, time, 0)

Review comment:
       Then, is it possible that we add redundant speculate tasks for the same 
tid?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is 
decommissioned" +
+    " and the tasks running on it cannot finish within 
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 
("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, Task 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6) // time = 6ms
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4) // time = 10ms
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks 
need to be running for
+    // > 15ms for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will 
be added to
+    //   executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK2 -> 15, TASK3 -> 15)
+    manager.executorDecommission("exec2")
+
+    assert(manager.checkSpeculatableTasks(0))
+    // Task2 started at t=0, so it can still finish before t=15 (Median task 
runtime = 10)
+    // Task3 started at t=6, so it might not finish before t=15. So Task 3 
should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Noting should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1) // time = 11 ms
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5) // time = 16 ms
+    // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 
4 will
+    // be selected for speculation. Here we are verifying that regular 
speculation configs
+    // should still take effect even when a 
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that task index 2 is launched on exec3, host3
+    assert(speculativeTask2.index === 2)
+    assert(speculativeTask2.taskId === 5)
+    assert(speculativeTask2.executorId === "exec3")
+    assert(speculativeTask2.attemptNumber === 1)
+
+    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(3) === 2)
+
+    // Offering additional resources should not lead to any speculative tasks 
being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+    assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
+    assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
+  }
+
+  test("SPARK-21040: Check speculative tasks are not launched when an 
executor" +

Review comment:
       Yes, please.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+        if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+          // Check whether this task will finish before the exectorKillTime 
assuming
+          // it will take medianDuration overall. If this task cannot finish 
within
+          // executorKillInterval, then this task is a candidate for 
speculation
+          val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
+          val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+            taskEndTimeBasedOnMedianDuration
+          if (canExceedDeadline) {
+            speculated = checkAndSubmitSpeculatableTask(tid, time, 0)

Review comment:
       I mean if it's possible to add redundant speculate tasks to 
`speculatableTasks` rather than running multiple  speculate tasks for the same 
tid at the same time.
   
   I think the actual prevention is `!speculatableTasks.contains(index)` in 
`checkAndSubmitSpeculatableTask`, but never mind since `speculatableTasks` is a 
`HashSet`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to