Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15644#discussion_r89144137
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -282,6 +316,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("scheduled tasks obey task and stage blacklists") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+ (0 to 2).foreach {stageId =>
+ val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId =
stageId, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ }
+
+ // Setup our mock blacklist:
+ // * stage 0 is blacklisted on node "host1"
+ // * stage 1 is blacklisted on executor "executor3"
+ // * stage 0, partition 0 is blacklisted on executor 0
+ // (mocked methods default to returning false, ie. no blacklisting)
+
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true)
+
when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3"))
+ .thenReturn(true)
+
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0",
0))
+ .thenReturn(true)
+
+ val offers = IndexedSeq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1),
+ new WorkerOffer("executor2", "host1", 1),
+ new WorkerOffer("executor3", "host2", 10)
+ )
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ // We should schedule all tasks.
+ assert(firstTaskAttempts.size === 6)
+ // Whenever we schedule a task, we must consult the node and executor
blacklist. (The test
+ // doesn't check exactly what checks are made because the offers get
shuffled.)
+ (0 to 2).foreach { stageId =>
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isNodeBlacklistedForTaskSet(anyString())
+ verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+ .isExecutorBlacklistedForTaskSet(anyString())
+ }
+
+ def tasksForStage(stageId: Int): Seq[TaskDescription] = {
+ firstTaskAttempts.filter{_.name.contains(s"stage $stageId")}
+ }
+ tasksForStage(0).foreach { task =>
+ // executors 1 & 2 blacklisted for node
+ // executor 0 blacklisted just for partition 0
+ if (task.index == 0) {
+ assert(task.executorId === "executor3")
+ } else {
+ assert(Set("executor0", "executor3").contains(task.executorId))
+ }
+ }
+ tasksForStage(1).foreach { task =>
+ // executor 3 blacklisted
+ assert("executor3" != task.executorId)
+ }
+ // no restrictions on stage 2
+
+ // Finally, just make sure that we can still complete tasks as usual
with blacklisting
+ // in effect. Finish each of the tasksets -- taskset 0 & 1 complete
successfully, taskset 2
+ // fails.
+ (0 to 2).foreach { stageId =>
+ val tasks = tasksForStage(stageId)
+ val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ if (stageId == 2) {
+ // Just need to make one task fail 4 times.
+ var task = tasks(0)
+ val taskIndex = task.index
+ (0 until 4).foreach { attempt =>
+ assert(task.attemptNumber === attempt)
+ tsm.handleFailedTask(task.taskId, TaskState.FAILED,
TaskResultLost)
+ val nextAttempts =
+
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4",
1))).flatten
+ if (attempt < 3) {
+ assert(nextAttempts.size === 1)
+ task = nextAttempts(0)
+ assert(task.index === taskIndex)
+ } else {
+ assert(nextAttempts.size === 0)
+ }
+ }
+ // End the other task of the taskset, doesn't matter whether it
succeeds or fails.
+ val otherTask = tasks(1)
+ val result = new
DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq())
+ tsm.handleSuccessfulTask(otherTask.taskId, result)
+ } else {
+ tasks.foreach { task =>
+ val result = new
DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq())
+ tsm.handleSuccessfulTask(task.taskId, result)
+ }
+ }
+ assert(tsm.isZombie)
+ }
+ }
+
+ /**
+ * Helper for performance tests. Takes the explicitly blacklisted nodes
and executors; verifies
+ * that the blacklists are used efficiently to ensure scheduling is not
O(numPendingTasks).
+ * Creates 1 offer on executor[1-3]. Executor1 & 2 are on host1,
executor3 is on host2. Passed
+ * in nodes and executors should be on that list.
+ */
+ private def testBlacklistPerformance(
+ testName: String,
+ nodeBlacklist: Seq[String],
+ execBlacklist: Seq[String]): Unit = {
+ // Because scheduling involves shuffling the order of offers around,
we run this test a few
+ // times to cover more possibilities. There are only 3 offers, which
means 6 permutations,
+ // so 10 iterations is pretty good.
+ (0 until 10).foreach { testItr =>
+ test(s"$testName: iteration $testItr") {
--- End diff --
In this case, the main reason is to get `beforeEach` and `afterEach` called
between iterations. We could it ourselves, but seems better to let it happen
automatically? If you are just referring to giving each iteration a unique
name, that is because its a requirement of scalatest.
I'm also in the habit of doing this because its useful when trying to flush
out flaky tests. If you put the loop *inside* the test, then the whole test
dies at the first exception, so its harder to figure out how often the test
fails (I'd rather have it do a full 1k iterations and count how many failed,
rather than have it fail at iteration 235). Also the unit test logs are a
little more verbose, but in a good way -- each iteration is clearly labeled, so
its a lot easier to jump to the failed iteration and compare to a good
iteration. But that is more for doing it while developing, not necessarily
something to check-in. I dunno if there is a convention here.
The downside is there are 10 entries in the test report, which is perhaps a
little more confusing, but its doesn't really seem so bad. I suppose one
problem is that flaky test reports might not work since each time it'll fail on
a different iteration.
I don't have strong feelings on this -- I'll push one final change which
just pushes the loop inside the test, so its easy for you to choose.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]