Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/13603#discussion_r68819144
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -281,6 +281,96 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("abort stage if executor loss results in unschedulability from
previously failed tasks") {
+ // Make sure we can detect when a taskset becomes unschedulability
from a blacklisting. This
+ // test explores a particular corner case -- you may have one task
fail, but still be
+ // schedulable on another executor. However, that executor may fail
later on, leaving the
+ // first task with no place to run.
+ val taskScheduler = setupScheduler(
+ // set this to something much longer than the test duration
+ "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ )
+
+ val taskSet = FakeTask.createTaskSet(2)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId,
taskSet.stageAttemptId).get
+
+ val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1)
+ )).flatten
+ assert(Set("executor0", "executor1") ===
firstTaskAttempts.map(_.executorId).toSet)
+
+ // fail one of the tasks, but leave the other running
+ val failedTask = firstTaskAttempts.find(_.executorId ==
"executor0").get
+ taskScheduler.handleFailedTask(tsm, failedTask.taskId,
TaskState.FAILED, TaskResultLost)
+ // at this point, our failed task could run on the other executor, so
don't give up the task
+ // set yet.
+ assert(!failedTaskSet)
+
+ // Now we fail our second executor. The other task can still run on
executor1, so make an offer
+ // on that executor, and make sure that the other task (not the failed
one) is assigned there
+ taskScheduler.executorLost("executor1", SlaveLost("oops"))
+ val nextTaskAttempts =
+ taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0",
"host0", 1))).flatten
+ // Note: Its OK if some future change makes this already realize the
taskset has become
+ // unschedulable at this point (though in the current implementation,
we're sure it will not)
+ assert(nextTaskAttempts.size === 1)
+ assert(nextTaskAttempts.head.executorId === "executor0")
+ assert(nextTaskAttempts.head.attemptNumber === 1)
+ assert(nextTaskAttempts.head.index != failedTask.index)
+
+ // now we should definitely realize that our task set is
unschedulable, because the only
+ // task left can't be scheduled on any executors due to the blacklist
+ taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0",
1)))
+ sc.listenerBus.waitUntilEmpty(100000)
+ assert(tsm.isZombie)
+ assert(failedTaskSet)
+ val idx = failedTask.index
+ assert(failedTaskSetReason.contains(s"Aborting TaskSet 0.0 because
Task $idx (partition $idx)" +
+ s" cannot be scheduled on any executor due to blacklists."))
+ }
+
+ test("don't abort if there is an executor available, though it hasn't
had scheduled tasks yet") {
+ // interaction of SPARK-15865 & SPARK-16106
+ // if we have a small number of tasks, we might be able to schedule
them all on the first
+ // executor. But if those tasks fail, we should still realize there
is another executor
+ // available and not bail on the job
+
+ val taskScheduler = setupScheduler(
+ // set this to something much longer than the test duration
+ "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ )
+
+ val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ =>
Seq(TaskLocation("host0")) }: _*)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId,
taskSet.stageAttemptId).get
+
+ val offers = Seq(
+ // each offer has more than enough free cores for the entire task
set, so we schedule
+ // all tasks on one executor
--- End diff --
yes, because of locality preference. I'll update the comment to make that
clear
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]