Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r224167756
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +419,61 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
    +              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
    +              // taskSet.
    +              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
    +              // task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
    +              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
    +              // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
    +              // timer to expire and abort the taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some (x) =>
    +                  val executorId = x._1
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(new TimerTask() {
    +                      override def run() {
    +                        if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
    +                          (unschedulableTaskSetToExpiryTime(taskSet) + 
timeout)
    +                            <= clock.getTimeMillis()
    --- End diff --
    
    it doesn't fit within the 100 char limit


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to