Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r228788350
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
    +              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
    +              // taskSet.
    +              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
    +              // task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
    +              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
    +              // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
    +              // timer to expire and abort the taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some ((executorId, _)) =>
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(
    +                      createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
    +                  }
    +                case _ => // Abort Immediately
    +                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
    +                    s" executors can be found to kill. Aborting $taskSet." 
)
    +                  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
    +              }
    +            case _ => // Do nothing if no tasks completely blacklisted.
    +          }
    +        } else {
    +          // We want to defer killing any taskSets as long as we have a 
non blacklisted executor
    +          // which can be used to schedule a task from any active 
taskSets. This ensures that the
    +          // job can make progress and if we encounter a flawed taskSet it 
will eventually either
    +          // fail or abort due to being completely blacklisted.
    --- End diff --
    
    so its a bit worse than regular starvation from having competing tasksets, 
as in this case you might actually have resources available on your cluster, 
but you never ask for them, because the executor allocation manager thinks you 
have enough based on the number of pending tasks.
    
    In any case, I agree this is a stretch, and overall its an improvement, so 
I'm OK with it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to