Ngone51 commented on a change in pull request #28619:
URL: https://github.com/apache/spark/pull/28619#discussion_r430165260
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
Review comment:
nit: `Noting` -> `Nothing`
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1100,8 +1115,12 @@ private[spark] class TaskSetManager(
def executorDecommission(execId: String): Unit = {
recomputeLocality()
- // Future consideration: if an executor is decommissioned it may make
sense to add the current
- // tasks to the spec exec queue.
+ if (executorDecommissionKillInterval.nonEmpty) {
+ val executorKillTime = clock.getTimeMillis() +
executorDecommissionKillInterval.get
+ runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid
=>
+ tidToExecutorKillTimeMapping(tid) = executorKillTime
+ }
+ }
Review comment:
nit:
```suggestion
executorDecommissionKillInterval.foreach { interval =>
val executorKillTime = clock.getTimeMillis() + interval
taskInfos.filter(info.executorId == execId && info.running).foreach {
info =>
tidToExecutorKillTimeMapping(into.taskId) = executorKillTime
}
}
```
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1842,6 +1842,17 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
+ private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
+ ConfigBuilder("spark.executor.decommission.killInterval")
+ .doc("Duration after which a decommissioned executor will be killed
forcefully." +
+ "This config is useful for cloud environments where we know in advance
when " +
+ "an executor is going to go down after decommissioning signal Ex-
around 2 mins " +
+ "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is
currently " +
+ "used to decide what tasks running on decommission executors to
speculate")
+ .version("3.1.0")
+ .timeConf(TimeUnit.MILLISECONDS)
Review comment:
is it too small since your given examples are at least using minute?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+ // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task
4 will
+ // be selected for speculation. Here we are verifying that regular
speculation configs
+ // should still take effect even when a
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+ // corresponding executor is decommissioned
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+ val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ assert(taskOption2New.isDefined)
+ val speculativeTask2 = taskOption2New.get
+ // Ensure that task index 2 is launched on exec3, host3
+ assert(speculativeTask2.index === 2)
+ assert(speculativeTask2.taskId === 5)
+ assert(speculativeTask2.executorId === "exec3")
+ assert(speculativeTask2.attemptNumber === 1)
+
+ assert(manager.copiesRunning(2) === 2)
+ assert(manager.copiesRunning(3) === 2)
+
+ // Offering additional resources should not lead to any speculative tasks
being respawned
+ assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
Review comment:
Do we need to apply resourceOffer for exec2 since it's decommissioning?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
Review comment:
BTW, it should be TASK 2 instead of TASK 4?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
Review comment:
> Task 4 has completed 16 ms
Do you mean task 4 has been running for 16ms?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+ // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task
4 will
+ // be selected for speculation. Here we are verifying that regular
speculation configs
+ // should still take effect even when a
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+ // corresponding executor is decommissioned
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+ val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ assert(taskOption2New.isDefined)
+ val speculativeTask2 = taskOption2New.get
+ // Ensure that task index 2 is launched on exec3, host3
+ assert(speculativeTask2.index === 2)
+ assert(speculativeTask2.taskId === 5)
+ assert(speculativeTask2.executorId === "exec3")
+ assert(speculativeTask2.attemptNumber === 1)
+
+ assert(manager.copiesRunning(2) === 2)
+ assert(manager.copiesRunning(3) === 2)
+
+ // Offering additional resources should not lead to any speculative tasks
being respawned
+ assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
+ }
+
+ test("SPARK-21040: Check speculative tasks are not launched when an
executor" +
Review comment:
I think this test maybe not necessary because the above test has already
covered it:
```
// Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
// Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
// of speculativeTasks
```
?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
Review comment:
It seems `startedTasks` is updated asynchronically by
`FakeDAGScheduler`. Maybe we could use `eventually` to pretend the potential
failure.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
Review comment:
Could you assert `tidToExecutorKillTimeMapping` here?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
We need to remove the corresponding task from
`tidToExecutorKillTimeMapping` if `speculated` is true?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
Review comment:
I see. you're right.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
Then, is it possible that we add redundant speculate tasks for the same
tid?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+ // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task
4 will
+ // be selected for speculation. Here we are verifying that regular
speculation configs
+ // should still take effect even when a
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+ // corresponding executor is decommissioned
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+ val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ assert(taskOption2New.isDefined)
+ val speculativeTask2 = taskOption2New.get
+ // Ensure that task index 2 is launched on exec3, host3
+ assert(speculativeTask2.index === 2)
+ assert(speculativeTask2.taskId === 5)
+ assert(speculativeTask2.executorId === "exec3")
+ assert(speculativeTask2.attemptNumber === 1)
+
+ assert(manager.copiesRunning(2) === 2)
+ assert(manager.copiesRunning(3) === 2)
+
+ // Offering additional resources should not lead to any speculative tasks
being respawned
+ assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
+ }
+
+ test("SPARK-21040: Check speculative tasks are not launched when an
executor" +
Review comment:
Yes, please.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
I mean if it's possible to add redundant speculate tasks to
`speculatableTasks` rather than running multiple speculate tasks for the same
tid at the same time.
I think the actual prevention is `!speculatableTasks.contains(index)` in
`checkAndSubmitSpeculatableTask`, but never mind since `speculatableTasks` is a
`HashSet`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]