Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r228608360
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +    "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
    +    // triggered.
    +    eventually(timeout(500.milliseconds)) {
    +      assert(tsm.isZombie)
    +    }
    +  }
    +
    +  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
    +    // executor and try to acquire a new one.
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    assert(!tsm.isZombie)
    +
    +    // Offer a new executor which should be accepted
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host0", 1)
    +    )).flatten.size === 1)
    --- End diff --
    
    ok yeah in that case, might as well leave it as the default, does this help 
at all?
    
    Also I'd add asserts on `unschedulableTaskSetToExpiryTime` here -- 
`assert(unschedulableTaskSetToExpiryTime.contains(tsm))` after the first one 
and `assert(unschedulableTaskSetToExpiryTime.isEmpty)` after the second one, 
thats the really important part, as we're not expecting to cross the timeout 
and make it a zombie anyway.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to