Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r227080844
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +    "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
    +    // triggered.
    +    eventually(timeout(500.milliseconds)) {
    +      assert(tsm.isZombie)
    +    }
    +  }
    +
    +  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
    +    // executor and try to acquire a new one.
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    assert(!tsm.isZombie)
    +
    +    // Offer a new executor which should be accepted
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host0", 1)
    +    )).flatten.size === 1)
    +
    +    assert(!tsm.isZombie)
    +  }
    +
    +  // This is to test a scenario where we have two taskSets completely 
blacklisted and on acquiring
    +  // a new executor we don't want the abort timer for the second taskSet 
to expire and abort the job
    +  test("SPARK-22148 abort timer should clear 
unschedulableTaskSetToExpiryTime for all TaskSets") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
    +
    +    // We have 2 taskSets with 1 task remaining in each with 1 executor 
completely blacklisted
    +    val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, 
stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet1)
    +    val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, 
stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet2)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer. We will schedule the task from the second taskSet. 
Since a task was scheduled
    +    // we do not kick off the abort timer for taskSet1
    +    val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    --- End diff --
    
    Okay.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to