Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r228678209
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
    +              // the timeout will abort the taskSet if we were unable to 
schedule any task from the
    +              // taskSet.
    +              // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
    +              // task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
    +              // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
    +              // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
    +              // timer to expire and abort the taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some ((executorId, _)) =>
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(
    +                      createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
    +                  }
    +                case _ => // Abort Immediately
    +                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
    +                    s" executors can be found to kill. Aborting $taskSet." 
)
    +                  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
    +              }
    +            case _ => // Do nothing if no tasks completely blacklisted.
    +          }
    +        } else {
    +          // We want to defer killing any taskSets as long as we have a 
non blacklisted executor
    +          // which can be used to schedule a task from any active 
taskSets. This ensures that the
    +          // job can make progress and if we encounter a flawed taskSet it 
will eventually either
    +          // fail or abort due to being completely blacklisted.
    --- End diff --
    
    ok, yeah it seems like it would have to be very timing dependent that 
taskset1 never got a chance for that executor, really that would just be a 
normal indefinite postponement problem in the scheduler regardless of 
blacklisting.   I don't think with fifo its a problem as first taskset should 
always be first. With Fair scheduler perhaps it could but probably depends on 
much more specific scenario.   
    
    I guess I'm ok with this if you are.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to