Repository: spark Updated Branches: refs/heads/master ad67993b7 -> 8b1609beb
[SPARK-18117][CORE] Add test for TaskSetBlacklist ## What changes were proposed in this pull request? This adds tests to verify the interaction between TaskSetBlacklist and TaskSchedulerImpl. TaskSetBlacklist was introduced by SPARK-17675 but it neglected to add these tests. This change does not fix any bugs -- it is just for increasing test coverage. ## How was this patch tested? Jenkins Author: Imran Rashid <iras...@cloudera.com> Closes #15644 from squito/taskset_blacklist_test_update. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b1609be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b1609be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b1609be Branch: refs/heads/master Commit: 8b1609bebe489b2ef78db4be6e9836687089fe3d Parents: ad67993 Author: Imran Rashid <iras...@cloudera.com> Authored: Mon Nov 28 13:47:09 2016 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Mon Nov 28 13:47:09 2016 -0600 ---------------------------------------------------------------------- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 254 ++++++++++++++++++- .../spark/scheduler/TaskSetManagerSuite.scala | 45 +++- 3 files changed, 292 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b766e41..f2a432c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -84,7 +84,7 @@ private[spark] class TaskSetManager( var totalResultSize = 0L var calculatedTasks = 0 - private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { + private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { if (BlacklistTracker.isBlacklistEnabled(conf)) { Some(new TaskSetBlacklist(conf, stageId, clock)) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f5f1947..5dc7708 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.scheduler +import scala.collection.mutable.HashMap + +import org.mockito.Matchers.{anyInt, anyString, eq => meq} +import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when} import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config @@ -31,7 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend { } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach - with Logging { + with Logging with MockitoSugar { var failedTaskSetException: Option[Throwable] = None var failedTaskSetReason: String = null @@ -40,11 +45,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B var taskScheduler: TaskSchedulerImpl = null var dagScheduler: DAGScheduler = null + val stageToMockTaskSetBlacklist = new HashMap[Int, TaskSetBlacklist]() + val stageToMockTaskSetManager = new HashMap[Int, TaskSetManager]() + override def beforeEach(): Unit = { super.beforeEach() failedTaskSet = false failedTaskSetException = None failedTaskSetReason = null + stageToMockTaskSetBlacklist.clear() + stageToMockTaskSetManager.clear() } override def afterEach(): Unit = { @@ -66,6 +76,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) + setupHelper() + } + + def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = { + val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") + conf.set(config.BLACKLIST_ENABLED, true) + sc = new SparkContext(conf) + taskScheduler = + new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) { + override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = { + val tsm = super.createTaskSetManager(taskSet, maxFailures) + // we need to create a spied tsm just so we can set the TaskSetBlacklist + val tsmSpy = spy(tsm) + val taskSetBlacklist = mock[TaskSetBlacklist] + when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(taskSetBlacklist)) + stageToMockTaskSetManager(taskSet.stageId) = tsmSpy + stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist + tsmSpy + } + } + setupHelper() + } + + def setupHelper(): TaskSchedulerImpl = { taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. dagScheduler = new DAGScheduler(sc, taskScheduler) { @@ -282,6 +316,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("scheduled tasks obey task and stage blacklists") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + (0 to 2).foreach {stageId => + val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + } + + // Setup our mock blacklist: + // * stage 0 is blacklisted on node "host1" + // * stage 1 is blacklisted on executor "executor3" + // * stage 0, partition 0 is blacklisted on executor 0 + // (mocked methods default to returning false, ie. no blacklisting) + when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true) + when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3")) + .thenReturn(true) + when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", 0)) + .thenReturn(true) + + val offers = IndexedSeq( + new WorkerOffer("executor0", "host0", 1), + new WorkerOffer("executor1", "host1", 1), + new WorkerOffer("executor2", "host1", 1), + new WorkerOffer("executor3", "host2", 10) + ) + val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten + // We should schedule all tasks. + assert(firstTaskAttempts.size === 6) + // Whenever we schedule a task, we must consult the node and executor blacklist. (The test + // doesn't check exactly what checks are made because the offers get shuffled.) + (0 to 2).foreach { stageId => + verify(stageToMockTaskSetBlacklist(stageId), atLeast(1)) + .isNodeBlacklistedForTaskSet(anyString()) + verify(stageToMockTaskSetBlacklist(stageId), atLeast(1)) + .isExecutorBlacklistedForTaskSet(anyString()) + } + + def tasksForStage(stageId: Int): Seq[TaskDescription] = { + firstTaskAttempts.filter{_.name.contains(s"stage $stageId")} + } + tasksForStage(0).foreach { task => + // executors 1 & 2 blacklisted for node + // executor 0 blacklisted just for partition 0 + if (task.index == 0) { + assert(task.executorId === "executor3") + } else { + assert(Set("executor0", "executor3").contains(task.executorId)) + } + } + tasksForStage(1).foreach { task => + // executor 3 blacklisted + assert("executor3" != task.executorId) + } + // no restrictions on stage 2 + + // Finally, just make sure that we can still complete tasks as usual with blacklisting + // in effect. Finish each of the tasksets -- taskset 0 & 1 complete successfully, taskset 2 + // fails. + (0 to 2).foreach { stageId => + val tasks = tasksForStage(stageId) + val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get + val valueSer = SparkEnv.get.serializer.newInstance() + if (stageId == 2) { + // Just need to make one task fail 4 times. + var task = tasks(0) + val taskIndex = task.index + (0 until 4).foreach { attempt => + assert(task.attemptNumber === attempt) + tsm.handleFailedTask(task.taskId, TaskState.FAILED, TaskResultLost) + val nextAttempts = + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4", 1))).flatten + if (attempt < 3) { + assert(nextAttempts.size === 1) + task = nextAttempts(0) + assert(task.index === taskIndex) + } else { + assert(nextAttempts.size === 0) + } + } + // End the other task of the taskset, doesn't matter whether it succeeds or fails. + val otherTask = tasks(1) + val result = new DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq()) + tsm.handleSuccessfulTask(otherTask.taskId, result) + } else { + tasks.foreach { task => + val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq()) + tsm.handleSuccessfulTask(task.taskId, result) + } + } + assert(tsm.isZombie) + } + } + + /** + * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies + * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks). + * Creates 1 offer on executor[1-3]. Executor1 & 2 are on host1, executor3 is on host2. Passed + * in nodes and executors should be on that list. + */ + private def testBlacklistPerformance( + testName: String, + nodeBlacklist: Seq[String], + execBlacklist: Seq[String]): Unit = { + // Because scheduling involves shuffling the order of offers around, we run this test a few + // times to cover more possibilities. There are only 3 offers, which means 6 permutations, + // so 10 iterations is pretty good. + (0 until 10).foreach { testItr => + test(s"$testName: iteration $testItr") { + // When an executor or node is blacklisted, we want to make sure that we don't try + // scheduling each pending task, one by one, to discover they are all blacklisted. This is + // important for performance -- if we did check each task one-by-one, then responding to a + // resource offer (which is usually O(1)-ish) would become O(numPendingTasks), which would + // slow down scheduler throughput and slow down scheduling even on healthy executors. + // Here, we check a proxy for the runtime -- we make sure the scheduling is short-circuited + // at the node or executor blacklist, so we never check the per-task blacklist. We also + // make sure we don't check the node & executor blacklist for the entire taskset + // O(numPendingTasks) times. + + taskScheduler = setupSchedulerWithMockTaskSetBlacklist() + // we schedule 500 tasks so we can clearly distinguish anything that is O(numPendingTasks) + val taskSet = FakeTask.createTaskSet(numTasks = 500, stageId = 0, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + + val offers = IndexedSeq( + new WorkerOffer("executor1", "host1", 1), + new WorkerOffer("executor2", "host1", 1), + new WorkerOffer("executor3", "host2", 1) + ) + // We should check the node & exec blacklists, but only O(numOffers), not O(numPendingTasks) + // times. In the worst case, after shuffling, we offer our blacklisted resource first, and + // then offer other resources which do get used. The taskset blacklist is consulted + // repeatedly as we offer resources to the taskset -- each iteration either schedules + // something, or it terminates that locality level, so the maximum number of checks is + // numCores + numLocalityLevels + val numCoresOnAllOffers = offers.map(_.cores).sum + val numLocalityLevels = TaskLocality.values.size + val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels + + // Setup the blacklist + nodeBlacklist.foreach { node => + when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true) + } + execBlacklist.foreach { exec => + when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec)) + .thenReturn(true) + } + + // Figure out which nodes have any effective blacklisting on them. This means all nodes + // that are explicitly blacklisted, plus those that have *any* executors blacklisted. + val nodesForBlacklistedExecutors = offers.filter { offer => + execBlacklist.contains(offer.executorId) + }.map(_.host).toSet.toSeq + val nodesWithAnyBlacklisting = (nodeBlacklist ++ nodesForBlacklistedExecutors).toSet + // Similarly, figure out which executors have any blacklisting. This means all executors + // that are explicitly blacklisted, plus all executors on nodes that are blacklisted. + val execsForBlacklistedNodes = offers.filter { offer => + nodeBlacklist.contains(offer.host) + }.map(_.executorId).toSeq + val executorsWithAnyBlacklisting = (execBlacklist ++ execsForBlacklistedNodes).toSet + + // Schedule a taskset, and make sure our test setup is correct -- we are able to schedule + // a task on all executors that aren't blacklisted (whether that executor is a explicitly + // blacklisted, or implicitly blacklisted via the node blacklist). + val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten + assert(firstTaskAttempts.size === offers.size - executorsWithAnyBlacklisting.size) + + // Now check that we haven't made too many calls to any of the blacklist methods. + // We should be checking our node blacklist, but it should be within the bound we defined + // above. + verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks)) + .isNodeBlacklistedForTaskSet(anyString()) + // We shouldn't ever consult the per-task blacklist for the nodes that have been blacklisted + // for the entire taskset, since the taskset level blacklisting should prevent scheduling + // from ever looking at specific tasks. + nodesWithAnyBlacklisting.foreach { node => + verify(stageToMockTaskSetBlacklist(0), never) + .isNodeBlacklistedForTask(meq(node), anyInt()) + } + executorsWithAnyBlacklisting.foreach { exec => + // We should be checking our executor blacklist, but it should be within the bound defined + // above. Its possible that this will be significantly fewer calls, maybe even 0, if + // there is also a node-blacklist which takes effect first. But this assert is all we + // need to avoid an O(numPendingTask) slowdown. + verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks)) + .isExecutorBlacklistedForTaskSet(exec) + // We shouldn't ever consult the per-task blacklist for executors that have been + // blacklisted for the entire taskset, since the taskset level blacklisting should prevent + // scheduling from ever looking at specific tasks. + verify(stageToMockTaskSetBlacklist(0), never) + .isExecutorBlacklistedForTask(meq(exec), anyInt()) + } + } + } + } + + testBlacklistPerformance( + testName = "Blacklisted node for entire task set prevents per-task blacklist checks", + nodeBlacklist = Seq("host1"), + execBlacklist = Seq()) + + testBlacklistPerformance( + testName = "Blacklisted executor for entire task set prevents per-task blacklist checks", + nodeBlacklist = Seq(), + execBlacklist = Seq("executor3") + ) + test("abort stage if executor loss results in unschedulability from previously failed tasks") { // Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This // test explores a particular corner case -- you may have one task fail, but still be @@ -301,27 +540,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B )).flatten assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet) - // fail one of the tasks, but leave the other running + // Fail one of the tasks, but leave the other running. val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) - // at this point, our failed task could run on the other executor, so don't give up the task + // At this point, our failed task could run on the other executor, so don't give up the task // set yet. assert(!failedTaskSet) // Now we fail our second executor. The other task can still run on executor1, so make an offer - // on that executor, and make sure that the other task (not the failed one) is assigned there + // on that executor, and make sure that the other task (not the failed one) is assigned there. taskScheduler.executorLost("executor1", SlaveLost("oops")) val nextTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten // Note: Its OK if some future change makes this already realize the taskset has become - // unschedulable at this point (though in the current implementation, we're sure it will not) + // unschedulable at this point (though in the current implementation, we're sure it will not). assert(nextTaskAttempts.size === 1) assert(nextTaskAttempts.head.executorId === "executor0") assert(nextTaskAttempts.head.attemptNumber === 1) assert(nextTaskAttempts.head.index != failedTask.index) - // now we should definitely realize that our task set is unschedulable, because the only - // task left can't be scheduled on any executors due to the blacklist + // Now we should definitely realize that our task set is unschedulable, because the only + // task left can't be scheduled on any executors due to the blacklist. taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) sc.listenerBus.waitUntilEmpty(100000) assert(tsm.isZombie) @@ -408,4 +647,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(thirdTaskDescs.size === 0) assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) } + } http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1b1a764..abc8fff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -22,11 +22,13 @@ import java.util.Random import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.mockito.Mockito.{mock, verify} +import org.mockito.Matchers.{anyInt, anyString} +import org.mockito.Mockito.{mock, never, spy, verify, when} import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.internal.Logging +import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ManualClock} class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) @@ -992,6 +994,47 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager3.name === "TaskSet_1.1") } + test("don't update blacklist for shuffle-fetch failures, preemption, denied commits, " + + "or killed tasks") { + // Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit, + // and killed task. + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + val tsm = new TaskSetManager(sched, taskSet, 4) + // we need a spy so we can attach our mock blacklist + val tsmSpy = spy(tsm) + val blacklist = mock(classOf[TaskSetBlacklist]) + when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist)) + + // make some offers to our taskset, to get tasks we will fail + val taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 4) + + // now fail those tasks + tsmSpy.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED, + FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored")) + tsmSpy.handleFailedTask(taskDescs(1).taskId, TaskState.FAILED, + ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None)) + tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED, + TaskCommitDenied(0, 2, 0)) + tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, + TaskKilled) + + // Make sure that the blacklist ignored all of the task failures above, since they aren't + // the fault of the executor where the task was running. + verify(blacklist, never()) + .updateBlacklistForFailedTask(anyString(), anyString(), anyInt()) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org