Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22288#discussion_r222816658
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -415,9 +421,63 @@ private[spark] class TaskSchedulerImpl(
                 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
               } while (launchedTaskAtCurrentMaxLocality)
             }
    +
             if (!launchedAnyTask) {
    -          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    -        }
    +          taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
    +            case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
    +
    +              // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
    +              // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
    +              // executor and kick off an abortTimer which after waiting 
will abort the taskSet if
    +              // we were unable to schedule any task from the taskSet.
    +              // Note 1: We keep a track of schedulability on a per 
taskSet basis rather than on a
    +              // per task basis.
    +              // Note 2: The taskSet can still be aborted when there are 
more than one idle
    +              // blacklisted executors and dynamic allocation is on. This 
is because we rely on the
    +              // ExecutorAllocationManager to acquire a new executor based 
on the pending tasks and
    +              // it won't release any blacklisted executors which idle 
timeout after we kill an
    +              // executor to acquire a new one, resulting in the abort 
timer to expire and abort the
    +              // taskSet.
    +              executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
    +                case Some (x) =>
    +                  val executorId = x._1
    +                  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
    +                    blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
    +
    +                    unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
    +                    val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
    +                    logInfo(s"Waiting for $timeout ms for completely "
    +                      + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
    +                    abortTimer.schedule(new TimerTask() {
    +                      override def run() {
    +                        if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
    +                          (unschedulableTaskSetToExpiryTime(taskSet) + 
timeout)
    +                            <= clock.getTimeMillis()
    +                        ) {
    +                          logInfo("Cannot schedule any task because of 
complete blacklisting. " +
    +                            s"Wait time for scheduling expired. Aborting 
$taskSet.")
    +                          
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
    +                        } else {
    +                          this.cancel()
    +                        }
    +                      }
    +                    }, timeout)
    +                  }
    +                case _ => // Abort Immediately
    +                  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
    +                  s" executors could be found. Aborting $taskSet." )
    +                  taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
    +              }
    +            case _ => // Do nothing.
    --- End diff --
    
    perhaps expand to say do nothing if no tasks completely blacklisted.  It 
looks like the indentation is off here too but it might just be because of the 
diff and comments


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to