prakharjain09 commented on a change in pull request #28619:
URL: https://github.com/apache/spark/pull/28619#discussion_r430474538
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1100,8 +1115,12 @@ private[spark] class TaskSetManager(
def executorDecommission(execId: String): Unit = {
recomputeLocality()
- // Future consideration: if an executor is decommissioned it may make
sense to add the current
- // tasks to the spec exec queue.
+ if (executorDecommissionKillInterval.nonEmpty) {
+ val executorKillTime = clock.getTimeMillis() +
executorDecommissionKillInterval.get
+ runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid
=>
+ tidToExecutorKillTimeMapping(tid) = executorKillTime
+ }
+ }
Review comment:
1. Added `executorDecommissionKillInterval.foreach { interval =>` to
remove the if check.
2. Its better to filter runningTasksSet as compared to taskInfos as it is
going to be relatively small.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
Review comment:
done.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+ // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task
4 will
+ // be selected for speculation. Here we are verifying that regular
speculation configs
+ // should still take effect even when a
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+ // corresponding executor is decommissioned
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+ val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ assert(taskOption2New.isDefined)
+ val speculativeTask2 = taskOption2New.get
+ // Ensure that task index 2 is launched on exec3, host3
+ assert(speculativeTask2.index === 2)
+ assert(speculativeTask2.taskId === 5)
+ assert(speculativeTask2.executorId === "exec3")
+ assert(speculativeTask2.attemptNumber === 1)
+
+ assert(manager.copiesRunning(2) === 2)
+ assert(manager.copiesRunning(3) === 2)
+
+ // Offering additional resources should not lead to any speculative tasks
being respawned
+ assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
Review comment:
Yes - we should not apply to exec2. Removed it.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
Review comment:
`dagScheduler.taskStarted` is called as part of `resourceOffer` and the
overridden taskStarted method of FakeDagScheduler appends it to `startedTasks`
set. So it seems synchronus only?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
Review comment:
done.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
Review comment:
Yea I meant Task2. Sorry for the confusion - Updated the line to - `At
t=16 ms, Task 2 has been running for 16 ms`.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
We are removing entries from tidToExecutorKillTimeMapping at time when
it is removed from `runningTasksSet`.
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -1892,6 +1892,156 @@ class TaskSetManagerSuite
testSpeculationDurationThreshold(true, 2, 1)
}
+ test("SPARK-21040: Check speculative tasks are launched when an executor is
decommissioned" +
+ " and the tasks running on it cannot finish within
EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
+ val taskSet = FakeTask.createTaskSet(4)
+ sc.conf.set(config.SPECULATION_ENABLED, true)
+ sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+ sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+ sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Start TASK 0,1 on exec1, Task 2 on exec2
+ (0 until 2).foreach { _ =>
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+ assert(taskOption.get.executorId === "exec1")
+ }
+ val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption2.isDefined)
+ assert(taskOption2.get.executorId === "exec2")
+
+ clock.advance(6) // time = 6ms
+ // Start TASK 3 on exec2 after some delay
+ val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+ assert(taskOption3.isDefined)
+ assert(taskOption3.get.executorId === "exec2")
+
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+ clock.advance(4) // time = 10ms
+ // Complete the first 2 tasks and leave the other 2 tasks in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id,
accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the
threshold for
+ // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks
need to be running for
+ // > 15ms for speculation
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set())
+
+ // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will
be added to
+ // executorDecommissionSpeculationTriggerTimeoutOpt
+ // (TASK2 -> 15, TASK3 -> 15)
+ manager.executorDecommission("exec2")
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Task2 started at t=0, so it can still finish before t=15 (Median task
runtime = 10)
+ // Task3 started at t=6, so it might not finish before t=15. So Task 3
should be part
+ // of speculativeTasks
+ assert(sched.speculativeTasks.toSet === Set(3))
+ assert(manager.copiesRunning(3) === 1)
+
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ // Offer more resources. Noting should get scheduled now.
+ assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+ assert(taskOption3New.isDefined)
+
+ // Assert info about the newly launched speculative task
+ val speculativeTask3 = taskOption3New.get
+ assert(speculativeTask3.index === 3)
+ assert(speculativeTask3.taskId === 4)
+ assert(speculativeTask3.executorId === "exec3")
+ assert(speculativeTask3.attemptNumber === 1)
+
+ clock.advance(1) // time = 11 ms
+ // Running checkSpeculatableTasks again should return false
+ assert(!manager.checkSpeculatableTasks(0))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+
+ clock.advance(5) // time = 16 ms
+ // At t=16 ms, Task 4 has completed 16 ms. It is more than the
+ // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task
4 will
+ // be selected for speculation. Here we are verifying that regular
speculation configs
+ // should still take effect even when a
EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+ // corresponding executor is decommissioned
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+ assert(manager.copiesRunning(2) === 1)
+ assert(manager.copiesRunning(3) === 2)
+ val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+ assert(taskOption2New.isDefined)
+ val speculativeTask2 = taskOption2New.get
+ // Ensure that task index 2 is launched on exec3, host3
+ assert(speculativeTask2.index === 2)
+ assert(speculativeTask2.taskId === 5)
+ assert(speculativeTask2.executorId === "exec3")
+ assert(speculativeTask2.attemptNumber === 1)
+
+ assert(manager.copiesRunning(2) === 2)
+ assert(manager.copiesRunning(3) === 2)
+
+ // Offering additional resources should not lead to any speculative tasks
being respawned
+ assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
+ assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
+ }
+
+ test("SPARK-21040: Check speculative tasks are not launched when an
executor" +
Review comment:
Agreed. The above tests checks this functionality also. Should I remove
this?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
The `copiesRunning(index) == 1` condition inside
`checkAndSubmitSpeculatableTask` will prevent it. Irrespective of executor
decommissioning, that check prevents that only 1 speculatable task can run for
each task.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+ if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+ // Check whether this task will finish before the exectorKillTime
assuming
+ // it will take medianDuration overall. If this task cannot finish
within
+ // executorKillInterval, then this task is a candidate for
speculation
+ val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
+ val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+ taskEndTimeBasedOnMedianDuration
+ if (canExceedDeadline) {
+ speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
Review comment:
The `copiesRunning(index) == 1` condition inside
`checkAndSubmitSpeculatableTask` will prevent it. Irrespective of executor
decommissioning, that check makes sure that only 1 speculatable task can run
for each unique index.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]