Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/15644#discussion_r88524633
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -282,6 +317,188 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("scheduled tasks obey task and stage blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach {stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId =
stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ // Setup our mock blacklist:
+ // * stage 0 is blacklisted on node "host1"
+ // * stage 1 is blacklisted on executor "executor3"
+ // * stage 0, partition 0 is blacklisted on executor 0
+ // (mocked methods default to returning false, ie. no blacklisting)
+
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true)
+
when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3"))
+ .thenReturn(true)
+
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0",
0))
+ .thenReturn(true)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10)
+ )
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ // We should schedule all tasks.
+ assert(firstTaskAttempts.size === 6)
+ // Whenever we schedule a task, we must consult the node and executor
blacklist. (The test
+ // doesn't check exactly what checks are made because the offers get
shuffled.)
+ (0 to 2).foreach { stageId =>
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isNodeBlacklistedForTaskSet(anyString())
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isExecutorBlacklistedForTaskSet(anyString())
+ }
+
+ def tasksForStage(stageId: Int): Seq[TaskDescription] = {
+ firstTaskAttempts.filter{_.name.contains(s"stage $stageId")}
+ }
+ tasksForStage(0).foreach { task =>
+ // executors 1 & 2 blacklisted for node
+ // executor 0 blacklisted just for partition 0
+ if (task.index == 0) {
+ assert(task.executorId === "executor3")
+ } else {
+ assert(Set("executor0", "executor3").contains(task.executorId))
+ }
+ }
+ tasksForStage(1).foreach { task =>
+ // executor 3 blacklisted
+ assert("executor3" != task.executorId)
+ }
+ // no restrictions on stage 2
+
+ // Finally, just make sure that we can still complete tasks as usual
with blacklisting
+ // in effect. Finish each of the tasksets -- taskset 0 & 1 complete
successfully, taskset 2
+ // fails.
+ (0 to 2).foreach { stageId =>
+ val tasks = tasksForStage(stageId)
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ if (stageId == 2) {
+ // Just need to make one task fail 4 times.
+ var task = tasks(0)
+ val taskIndex = task.index
+ (0 until 4).foreach { attempt =>
+ assert(task.attemptNumber === attempt)
+ tsm.handleFailedTask(task.taskId, TaskState.FAILED,
TaskResultLost)
+ val nextAttempts =
+
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4",
1))).flatten
+ if (attempt < 3) {
+ assert(nextAttempts.size === 1)
+ task = nextAttempts(0)
+ assert(task.index === taskIndex)
+ } else {
+ assert(nextAttempts.size === 0)
+ }
+ }
+ // End the other task of the taskset, doesn't matter whether it
succeeds or fails.
+ val otherTask = tasks(1)
+ val result = new
DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq())
+ tsm.handleSuccessfulTask(otherTask.taskId, result)
+ } else {
+ tasks.foreach { task =>
+ val result = new
DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq())
+ tsm.handleSuccessfulTask(task.taskId, result)
+ }
+ }
+ assert(tsm.isZombie)
+ }
+ }
+
+ /**
+ * Helper for performance tests. Takes the explicitly blacklisted nodes
and executors; verifies
+ * that the blacklists are used efficiently to ensure scheduling is not
O(numPendingTasks).
+ */
--- End diff --
can you add the setup of the test (executor1 and executor2 on host1,
executor3 on host2) to this comment (and say that the passed in nodes and
executors should be in that list)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]