Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r230682694
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -503,6 +507,182 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         verify(tsm).abort(anyString(), anyObject())
       }
     
    +  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
    +      "executor can be acquired") {
    +    // set the abort timer to fail immediately
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to kick in immediately
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    // Wait for the abort timer to kick in. Even though we configure the 
timeout to be 0, there is a
    +    // slight delay as the abort timer is launched in a separate thread.
    +    eventually(timeout(500.milliseconds)) {
    +      assert(tsm.isZombie)
    +    }
    +  }
    +
    +  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
    +
    +    // We have only 1 task remaining with 1 executor
    +    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.head
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    // we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
    +    // Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
    +    // before it is launched and this fails the assertion check.
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
    +    // executor and try to acquire a new one.
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
    +    assert(!tsm.isZombie)
    +
    +    // Offer a new executor which should be accepted
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host0", 1)
    +    )).flatten.size === 1)
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
    +    assert(!tsm.isZombie)
    +  }
    +
    +  // This is to test a scenario where we have two taskSets completely 
blacklisted and on acquiring
    +  // a new executor we don't want the abort timer for the second taskSet 
to expire and abort the job
    +  test("SPARK-22148 abort timer should clear 
unschedulableTaskSetToExpiryTime for all TaskSets") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +
    +    // We have 2 taskSets with 1 task remaining in each with 1 executor 
completely blacklisted
    +    val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, 
stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet1)
    +    val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, 
stageAttemptId = 0)
    +    taskScheduler.submitTasks(taskSet2)
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
    +
    +    // Fail the running task
    +    val failedTask = firstTaskAttempts.head
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer. We will schedule the task from the second taskSet. 
Since a task was scheduled
    +    // we do not kick off the abort timer for taskSet1
    +    val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
    +
    +    val tsm2 = stageToMockTaskSetManager(1)
    +    val failedTask2 = secondTaskAttempts.head
    +    taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask2.index)).thenReturn(true)
    +
    +    // make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
    +    // timer for taskSet1 and taskSet2
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten.size === 0)
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm))
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm2))
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2)
    +
    +    // Offer a new executor which should be accepted
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host1", 1)
    +    )).flatten.size === 1)
    +
    +    // Check if all the taskSets are cleared
    +    assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty)
    +
    +    assert(!tsm.isZombie)
    +  }
    +
    +  // this test is to check that we don't abort a taskSet which is not 
being scheduled on other
    +  // executors as it is waiting on locality timeout and not being aborted 
because it is still not
    +  // completely blacklisted.
    +  test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been 
completely blacklisted") {
    +    taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
    +      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
    +
    +    val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", 
"executor0"))
    +    val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, 
stageAttemptId = 0,
    +      preferredLocation)
    +    taskScheduler.submitTasks(taskSet1)
    +
    +    val tsm = stageToMockTaskSetManager(0)
    +
    +    // submit an offer with one executor
    +    var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor0", "host0", 1)
    +    )).flatten
    +
    +    // Fail the running task
    +    val failedTask = taskAttempts.head
    +    taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
    +    tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
    +    when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
    +      "executor0", failedTask.index)).thenReturn(true)
    +
    +    // make an offer but we won't schedule anything yet as scheduler 
locality is still PROCESS_LOCAL
    +    assert(taskScheduler.resourceOffers(IndexedSeq(
    +      WorkerOffer("executor1", "host0", 1)
    +    )).flatten.isEmpty)
    --- End diff --
    
    this is dependent on the system clock not advancing past the locality 
timeout.  I've seen pauses on jenkins over 5 seconds in flaky tests -- either 
put in a manual clock or just increase the locality timeout in this test to 
avoid flakiness here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to