Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13603#discussion_r69005566
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -576,6 +576,59 @@ private[spark] class TaskSetManager(
       }
     
       /**
    +   * Check whether the given task set has been blacklisted to the point 
that it can't run anywhere.
    +   *
    +   * It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
    +   * blacklist.  The most common scenario would be if there are fewer 
executors than
    +   * spark.task.maxFailures. We need to detect this so we can fail the 
task set, otherwise the job
    +   * will hang.
    +   *
    +   * There's a tradeoff here: we could make sure all tasks in the task set 
are schedulable, but that
    +   * would add extra time to each iteration of the scheduling loop. Here, 
we take the approach of
    +   * making sure at least one of the unscheduled tasks is schedulable. 
This means we may not detect
    +   * the hang as quickly as we could have, but we'll always detect the 
hang eventually, and the
    +   * method is faster in the typical case. In the worst case, this method 
can take
    +   * O(maxTaskFailures + numTasks) time, but it will be faster when there 
haven't been any task
    +   * failures (this is because the method picks on unscheduled task, and 
then iterates through each
    +   * executor until it finds one that the task hasn't failed on already).
    +   */
    +  private[scheduler] def abortIfCompletelyBlacklisted(executors: 
Iterable[String]): Unit = {
    +
    +    def pendingTask: Option[Int] = {
    +      // usually this will just take the last pending task, but because of 
the lazy removal
    +      // from each list, we may need to go deeper in the list.  We poll 
from the end because
    +      // failed tasks are put back at the end of allPendingTasks, so we're 
more likely to find
    +      // an unschedulable task this way.
    +      var indexOffset = allPendingTasks.size
    +      while (indexOffset > 0) {
    +        indexOffset -= 1
    +        val indexInTaskSet = allPendingTasks(indexOffset)
    +        if (copiesRunning(indexInTaskSet) == 0 && 
!successful(indexInTaskSet)) {
    +          return Some(indexInTaskSet)
    +        }
    +      }
    +      None
    +    }
    +
    +    // If no executors have registered yet, don't abort the stage, just 
wait.  We probably
    +    // got here because a task set was added before the executors 
registered.
    +    if (executors.nonEmpty) {
    +      // take any task that needs to be scheduled, and see if we can find 
some executor it *could*
    +      // run on
    +      pendingTask.foreach { taskId =>
    +        executors.foreach { exec =>
    --- End diff --
    
    good point, in fact I can just use `executors.forall`.  Sorry I keep 
working on the new blacklist version in between and sometimes don't see some of 
these obvious simplifications in this version, thanks for catching them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to