Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/15644#discussion_r88755426
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -282,6 +316,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("scheduled tasks obey task and stage blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach {stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId =
stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ // Setup our mock blacklist:
+ // * stage 0 is blacklisted on node "host1"
+ // * stage 1 is blacklisted on executor "executor3"
+ // * stage 0, partition 0 is blacklisted on executor 0
+ // (mocked methods default to returning false, ie. no blacklisting)
+
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true)
+
when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3"))
+ .thenReturn(true)
+
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0",
0))
+ .thenReturn(true)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10)
+ )
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ // We should schedule all tasks.
+ assert(firstTaskAttempts.size === 6)
+ // Whenever we schedule a task, we must consult the node and executor
blacklist. (The test
+ // doesn't check exactly what checks are made because the offers get
shuffled.)
+ (0 to 2).foreach { stageId =>
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isNodeBlacklistedForTaskSet(anyString())
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isExecutorBlacklistedForTaskSet(anyString())
+ }
+
+ def tasksForStage(stageId: Int): Seq[TaskDescription] = {
+ firstTaskAttempts.filter{_.name.contains(s"stage $stageId")}
+ }
+ tasksForStage(0).foreach { task =>
+ // executors 1 & 2 blacklisted for node
+ // executor 0 blacklisted just for partition 0
+ if (task.index == 0) {
+ assert(task.executorId === "executor3")
+ } else {
+ assert(Set("executor0", "executor3").contains(task.executorId))
+ }
+ }
+ tasksForStage(1).foreach { task =>
+ // executor 3 blacklisted
+ assert("executor3" != task.executorId)
+ }
+ // no restrictions on stage 2
+
+ // Finally, just make sure that we can still complete tasks as usual
with blacklisting
+ // in effect. Finish each of the tasksets -- taskset 0 & 1 complete
successfully, taskset 2
+ // fails.
+ (0 to 2).foreach { stageId =>
+ val tasks = tasksForStage(stageId)
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ if (stageId == 2) {
+ // Just need to make one task fail 4 times.
+ var task = tasks(0)
+ val taskIndex = task.index
+ (0 until 4).foreach { attempt =>
+ assert(task.attemptNumber === attempt)
+ tsm.handleFailedTask(task.taskId, TaskState.FAILED,
TaskResultLost)
+ val nextAttempts =
+
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4",
1))).flatten
+ if (attempt < 3) {
+ assert(nextAttempts.size === 1)
+ task = nextAttempts(0)
+ assert(task.index === taskIndex)
+ } else {
+ assert(nextAttempts.size === 0)
+ }
+ }
+ // End the other task of the taskset, doesn't matter whether it
succeeds or fails.
+ val otherTask = tasks(1)
+ val result = new
DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq())
+ tsm.handleSuccessfulTask(otherTask.taskId, result)
+ } else {
+ tasks.foreach { task =>
+ val result = new
DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq())
+ tsm.handleSuccessfulTask(task.taskId, result)
+ }
+ }
+ assert(tsm.isZombie)
+ }
+ }
+
+ /**
+ * Helper for performance tests. Takes the explicitly blacklisted nodes
and executors; verifies
+ * that the blacklists are used efficiently to ensure scheduling is not
O(numPendingTasks).
+ * Creates 1 offer on executor[1-3]. Executor1 & 2 are on host1,
executor3 is on host2. Passed
+ * in nodes and executors should be on that list.
+ */
+ private def testBlacklistPerformance(
+ testName: String,
+ nodeBlacklist: Seq[String],
+ execBlacklist: Seq[String]): Unit = {
+ // Because scheduling involves shuffling the order of offers around,
we run this test a few
+ // times to cover more possibilities. There are only 3 offers, which
means 6 permutations,
+ // so 10 iterations is pretty good.
+ (0 until 10).foreach { testItr =>
+ test(s"$testName: iteration $testItr") {
+ // When an executor or node is blacklisted, we want to make sure
that we don't try
+ // scheduling each pending task, one by one, to discover they are
all blacklisted. This is
+ // important for performance -- if we did check each task
one-by-one, then responding to a
+ // resource offer (which is usually O(1)-ish) would become
O(numPendingTasks), which would
+ // slow down scheduler throughput and slow down scheduling even on
healthy executors.
+ // Here, we check a proxy for the runtime -- we make sure the
scheduling is short-circuited
+ // at the node or executor blacklist, so we never check the
per-task blacklist. We also
+ // make sure we don't check the node & executor blacklist for the
entire taskset
+ // O(numPendingTasks) times.
+
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ // we schedule 500 tasks so we can clearly distinguish anything
that is O(numPendingTasks)
+ val taskSet = FakeTask.createTaskSet(numTasks = 500, stageId = 0,
stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 1)
+ )
+ // We should check the node & exec blacklists, but only
O(numOffers), not O(numPendingTasks)
+ // times. In the worst case, after shuffling, we offer our
blacklisted resource first, and
+ // then offer other resources which do get used. The taskset
blacklist is consulted
+ // repeatedly as we offer resources to the taskset -- each
iteration either schedules
+ // something, or it terminates that locality level, so the maximum
number of checks is
+ // numCores + numLocalityLevels
+ val numCoresOnAllOffers = offers.map(_.cores).sum
+ val numLocalityLevels = TaskLocality.values.size
+ val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
+
+ // Setup the blacklist
+ nodeBlacklist.foreach { node =>
+
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
+ }
+ execBlacklist.foreach { exec =>
+
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec))
+ .thenReturn(true)
+ }
+
+ // Figure out which nodes have any effective blacklisting on them.
This means all nodes
+ // that are explicitly blacklisted, plus those that have *any*
executors blacklisted.
+ val nodesForBlacklistedExecutors = offers.filter { offer =>
+ execBlacklist.contains(offer.executorId)
+ }.map(_.host).toSet.toSeq
+ val nodesWithAnyBlacklisting = nodeBlacklist ++
nodesForBlacklistedExecutors
+ // Similarly, figure out which executors have any blacklisting.
This means all executors
+ // that are explicitly blacklisted, plus all executors on nodes
that are blacklisted.
+ val execsForBlacklistedNodes = offers.filter { offer =>
+ nodeBlacklist.contains(offer.host)
+ }.map(_.executorId).toSeq
+ val executorsWithAnyBlacklisting = execBlacklist ++
execsForBlacklistedNodes
--- End diff --
Should this be a set? Since you use the size below, and in theory this
could have dups (although not in either of the ways you call this). Same with
nodesWithAnyBlacklisting.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]