Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13603#discussion_r67772866
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -576,6 +576,62 @@ private[spark] class TaskSetManager(
       }
     
       /**
    +   * Check whether the given task set has been blacklisted to the point 
that it can't run anywhere.
    +   *
    +   * It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
    +   * blacklist.  The most common scenario would be if there are fewer 
executors than
    +   * spark.task.maxFailures. We need to detect this so we can fail the 
task set, otherwise the job
    +   * will hang.
    +   *
    +   * The check here is a balance between being sure to catch the issue, 
but not wasting
    +   * too much time inside the scheduling loop.  Just check if the last 
task is schedulable
    +   * on any of the available executors.  So this is O(numExecutors) 
worst-case, but it'll
    +   * really be fast unless you've got a bunch of things blacklisted.  Its 
possible it won't detect
    +   * the unschedulable task immediately, but if it returns false, there is 
at least *some* task
    +   * that is schedulable, and after scheduling all of those, we'll 
eventually find the unschedulable
    +   * task.
    +   */
    +  private[scheduler] def abortIfTaskSetCompletelyBlacklisted(
    +      executorsByHost: HashMap[String, HashSet[String]]): Unit = {
    +    // If no executors have registered yet, don't abort the stage, just 
wait.  We probably
    +    // got here because a task set was added before the executors 
registered.
    +    if (executorsByHost.nonEmpty) {
    +      // take any task that needs to be scheduled, and see if we can find 
some executor it *could*
    +      // run on
    +      pollPendingTask.foreach { task =>
    +        executorsByHost.foreach { case (host, execs) =>
    +          execs.foreach { exec =>
    +            if (!executorIsBlacklisted(exec, task)) {
    +              return
    +            }
    +          }
    +        }
    +        abort(s"Aborting ${taskSet} because it has a task which cannot be 
scheduled on any" +
    +          s" executor due to blacklists.")
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return some task which is pending, but do not remove it from the list 
of pending tasks.
    +   * Used as a simple way to test if this task set is schedulable 
anywhere, or if it has been
    +   * completely blacklisted.
    +   */
    +  private def pollPendingTask: Option[Int] = {
    +    // usually this will just take the last pending task, but because of 
the lazy removal
    +    // from each list, we may need to go deeper in the list
    +    var indexOffset = allPendingTasks.size
    --- End diff --
    
    Why do you start at the end here? Is the idea to find the task that's least 
likely to have been scheduled? (Can you add a brief comment to this effect?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to