Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/13603#discussion_r69005566
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -576,6 +576,59 @@ private[spark] class TaskSetManager(
}
/**
+ * Check whether the given task set has been blacklisted to the point
that it can't run anywhere.
+ *
+ * It is possible that this taskset has become impossible to schedule
*anywhere* due to the
+ * blacklist. The most common scenario would be if there are fewer
executors than
+ * spark.task.maxFailures. We need to detect this so we can fail the
task set, otherwise the job
+ * will hang.
+ *
+ * There's a tradeoff here: we could make sure all tasks in the task set
are schedulable, but that
+ * would add extra time to each iteration of the scheduling loop. Here,
we take the approach of
+ * making sure at least one of the unscheduled tasks is schedulable.
This means we may not detect
+ * the hang as quickly as we could have, but we'll always detect the
hang eventually, and the
+ * method is faster in the typical case. In the worst case, this method
can take
+ * O(maxTaskFailures + numTasks) time, but it will be faster when there
haven't been any task
+ * failures (this is because the method picks on unscheduled task, and
then iterates through each
+ * executor until it finds one that the task hasn't failed on already).
+ */
+ private[scheduler] def abortIfCompletelyBlacklisted(executors:
Iterable[String]): Unit = {
+
+ def pendingTask: Option[Int] = {
+ // usually this will just take the last pending task, but because of
the lazy removal
+ // from each list, we may need to go deeper in the list. We poll
from the end because
+ // failed tasks are put back at the end of allPendingTasks, so we're
more likely to find
+ // an unschedulable task this way.
+ var indexOffset = allPendingTasks.size
+ while (indexOffset > 0) {
+ indexOffset -= 1
+ val indexInTaskSet = allPendingTasks(indexOffset)
+ if (copiesRunning(indexInTaskSet) == 0 &&
!successful(indexInTaskSet)) {
+ return Some(indexInTaskSet)
+ }
+ }
+ None
+ }
+
+ // If no executors have registered yet, don't abort the stage, just
wait. We probably
+ // got here because a task set was added before the executors
registered.
+ if (executors.nonEmpty) {
+ // take any task that needs to be scheduled, and see if we can find
some executor it *could*
+ // run on
+ pendingTask.foreach { taskId =>
+ executors.foreach { exec =>
--- End diff --
good point, in fact I can just use `executors.forall`. Sorry I keep
working on the new blacklist version in between and sometimes don't see some of
these obvious simplifications in this version, thanks for catching them.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]