Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r228606711
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +507,181 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +      "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
    +    // triggered.
    --- End diff --
    
    comment is out of date, there is no sleep anymore.  But it is still worth 
explaining that even though there is no configured delay, we still have to wait 
for the abort timer to run in a separate thread.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to