Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/22288#discussion_r226477452
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,65 @@ private[spark] class TaskSchedulerImpl(
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
+
if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+ taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match
{
+ case taskIndex: Some[Int] => // Returns the taskIndex which
was unschedulable
+
+ // If the taskSet is unschedulable we try to find an
existing idle blacklisted
+ // executor. If we cannot find one, we abort immediately.
Else we kill the idle
+ // executor and kick off an abortTimer which if it doesn't
schedule a task within the
+ // the timeout will abort the taskSet if we were unable to
schedule any task from the
+ // taskSet.
+ // Note 1: We keep track of schedulability on a per taskSet
basis rather than on a per
+ // task basis.
+ // Note 2: The taskSet can still be aborted when there are
more than one idle
+ // blacklisted executors and dynamic allocation is on. This
can happen when a killed
+ // idle executor isn't replaced in time by
ExecutorAllocationManager as it relies on
+ // pending tasks and doesn't kill executors on idle
timeouts, resulting in the abort
+ // timer to expire and abort the taskSet.
+ executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1))
match {
+ case Some (x) =>
+ val executorId = x._1
+ if (!unschedulableTaskSetToExpiryTime.contains(taskSet))
{
+ blacklistTrackerOpt.foreach(blt =>
blt.killBlacklistedIdleExecutor(executorId))
+
+ unschedulableTaskSetToExpiryTime(taskSet) =
clock.getTimeMillis()
+ val timeout =
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+ logInfo(s"Waiting for $timeout ms for completely "
+ + s"blacklisted task to be schedulable again before
aborting $taskSet.")
+ abortTimer.schedule(new TimerTask() {
+ override def run() {
+ if
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+ (unschedulableTaskSetToExpiryTime(taskSet) +
timeout)
+ <= clock.getTimeMillis()
+ ) {
+ logInfo("Cannot schedule any task because of
complete blacklisting. " +
+ s"Wait time for scheduling expired. Aborting
$taskSet.")
+
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+ } else {
+ this.cancel()
+ }
+ }
+ }, timeout)
+ }
+ case _ => // Abort Immediately
+ logInfo("Cannot schedule any task because of complete
blacklisting. No idle" +
+ s" executors can be found to kill. Aborting $taskSet."
)
+ taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+ }
+ case _ => // Do nothing if no tasks completely blacklisted.
+ }
+ } else {
+ // If a task was scheduled, we clear the expiry time for all the
taskSets. This ensures
+ // that we have got atleast a non blacklisted executor and the
job can progress. The
+ // abort timer checks this entry to decide if we want to abort
the taskSet.
--- End diff --
typo: at least
Want to make sure I understand this part, and why you aren't only clearing
the timer for the taskset you just scheduled a task for. If you have multiple
tasksets running simultaneously, one is making progress but the other is
totally blacklisted, I guess you do not want to kill anything, because that
would mess with the taskset that is working correctly? Instead you'll just let
the taskset which is totally blacklisted eventually fail from the timeout? I
guess that makes sense, because if one taskset is progressing, it means the
failing taskset probably is probably flawed, not the executors.
If that's right, would be good to include something along those lines in
the comment (personally I don't find a comment about how its related to the
timer that useful, that's obvious from the code).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]