Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r223421728
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +505,92 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +    "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(
    +      tid = failedTask.taskId,
    +      state = TaskState.FAILED,
    +      serializedData = ByteBuffer.allocate(0)
    +    )
    +    // Wait for the failed task to propagate.
    +    Thread.sleep(500)
    +    //    taskScheduler.handleFailedTask(tsm, failedTask.taskId, 
TaskState.FAILED, TaskResultLost)
    +    //    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
TaskResultLost)
    +
    +    
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", 
failedTask.index))
    +      .thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
    +    // triggered.
    +    Thread.sleep(500)
    +    assert(tsm.isZombie)
    +  }
    +
    +  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
    +
    --- End diff --
    
    remove extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to