Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/22288#discussion_r228609353
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -503,6 +507,181 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
verify(tsm).abort(anyString(), anyObject())
}
+ test("SPARK-22148 abort timer should kick in when task is completely
blacklisted & no new " +
+ "executor can be acquired") {
+ // set the abort timer to fail immediately
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+ // We have only 1 task remaining with 1 executor
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.find(_.executorId ==
"executor0").get
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ // we explicitly call the handleFailedTask method here to avoid adding
a sleep in the test suite
+ // Reason being - handleFailedTask is run by an executor service and
there is a momentary delay
+ // before it is launched and this fails the assertion check.
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED,
UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule
anything, and set the abort
+ // timer to kick in immediately
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ // Wait for the abort timer to kick in. Without sleep the test exits
before the timer is
+ // triggered.
+ eventually(timeout(500.milliseconds)) {
+ assert(tsm.isZombie)
+ }
+ }
+
+ test("SPARK-22148 try to acquire a new executor when task is
unschedulable with 1 executor") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+ // We have only 1 task remaining with 1 executor
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.find(_.executorId ==
"executor0").get
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ // we explicitly call the handleFailedTask method here to avoid adding
a sleep in the test suite
+ // Reason being - handleFailedTask is run by an executor service and
there is a momentary delay
+ // before it is launched and this fails the assertion check.
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED,
UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule
anything, and set the abort
+ // timer to expire if no new executors could be acquired. We kill the
existing idle blacklisted
+ // executor and try to acquire a new one.
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+ assert(!tsm.isZombie)
+
+ // Offer a new executor which should be accepted
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor1", "host0", 1)
+ )).flatten.size === 1)
+
+ assert(!tsm.isZombie)
+ }
+
+ // This is to test a scenario where we have two taskSets completely
blacklisted and on acquiring
+ // a new executor we don't want the abort timer for the second taskSet
to expire and abort the job
+ test("SPARK-22148 abort timer should clear
unschedulableTaskSetToExpiryTime for all TaskSets") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+ // We have 2 taskSets with 1 task remaining in each with 1 executor
completely blacklisted
+ val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0,
stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet1)
+ val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1,
stageAttemptId = 0)
+ taskScheduler.submitTasks(taskSet2)
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0)
+
+ // Fail the running task
+ val failedTask = firstTaskAttempts.find(_.executorId ==
"executor0").get
+ taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED,
UnknownReason)
+ when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask.index)).thenReturn(true)
+
+ // make an offer. We will schedule the task from the second taskSet.
Since a task was scheduled
+ // we do not kick off the abort timer for taskSet1
+ val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0)
+
+ val tsm2 = stageToMockTaskSetManager(1)
+ val failedTask2 = secondTaskAttempts.find(_.executorId ==
"executor0").get
+ taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED,
ByteBuffer.allocate(0))
+ tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED,
UnknownReason)
+ when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+ "executor0", failedTask2.index)).thenReturn(true)
+
+ // make an offer on the blacklisted executor. We won't schedule
anything, and set the abort
+ // timer for taskSet1 and taskSet2
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten.size === 0)
+
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2)
+
+ // Offer a new executor which should be accepted
+ assert(taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor1", "host1", 1)
+ )).flatten.size === 1)
+
+ // Check if all the taskSets are cleared
+ assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0)
+
+ assert(!tsm.isZombie)
+ }
+
+ // this test is to check that we don't abort a taskSet which is not
being scheduled on other
+ // executors as it is waiting on locality timeout and not being aborted
because it is still not
+ // completely blacklisted.
+ test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been
completely blacklisted") {
+ taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+ config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+ val preferredLocation = Seq(ExecutorCacheTaskLocation("host0",
"executor0"))
+ val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0,
stageAttemptId = 0,
+ preferredLocation)
+ taskScheduler.submitTasks(taskSet1)
+
+ val tsm = stageToMockTaskSetManager(0)
+
+ // submit an offer with one executor
+ var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor0", "host0", 1)
+ )).flatten
+
+ // Fail the running task
+ val failedTask = taskAttempts.find(_.executorId == "executor0").get
--- End diff --
same here
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]