Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/22288#discussion_r222816658
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +421,63 @@ private[spark] class TaskSchedulerImpl(
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
+
if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
- }
+ taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match
{
+ case taskIndex: Some[Int] => // Returns the taskIndex which
was unschedulable
+
+ // If the taskSet is unschedulable we try to find an
existing idle blacklisted
+ // executor. If we cannot find one, we abort immediately.
Else we kill the idle
+ // executor and kick off an abortTimer which after waiting
will abort the taskSet if
+ // we were unable to schedule any task from the taskSet.
+ // Note 1: We keep a track of schedulability on a per
taskSet basis rather than on a
+ // per task basis.
+ // Note 2: The taskSet can still be aborted when there are
more than one idle
+ // blacklisted executors and dynamic allocation is on. This
is because we rely on the
+ // ExecutorAllocationManager to acquire a new executor based
on the pending tasks and
+ // it won't release any blacklisted executors which idle
timeout after we kill an
+ // executor to acquire a new one, resulting in the abort
timer to expire and abort the
+ // taskSet.
+ executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1))
match {
+ case Some (x) =>
+ val executorId = x._1
+ if (!unschedulableTaskSetToExpiryTime.contains(taskSet))
{
+ blacklistTrackerOpt.foreach(blt =>
blt.killBlacklistedIdleExecutor(executorId))
+
+ unschedulableTaskSetToExpiryTime(taskSet) =
clock.getTimeMillis()
+ val timeout =
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+ logInfo(s"Waiting for $timeout ms for completely "
+ + s"blacklisted task to be schedulable again before
aborting $taskSet.")
+ abortTimer.schedule(new TimerTask() {
+ override def run() {
+ if
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+ (unschedulableTaskSetToExpiryTime(taskSet) +
timeout)
+ <= clock.getTimeMillis()
+ ) {
+ logInfo("Cannot schedule any task because of
complete blacklisting. " +
+ s"Wait time for scheduling expired. Aborting
$taskSet.")
+
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+ } else {
+ this.cancel()
+ }
+ }
+ }, timeout)
+ }
+ case _ => // Abort Immediately
+ logInfo("Cannot schedule any task because of complete
blacklisting. No idle" +
+ s" executors could be found. Aborting $taskSet." )
+ taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+ }
+ case _ => // Do nothing.
--- End diff --
perhaps expand to say do nothing if no tasks completely blacklisted. It
looks like the indentation is off here too but it might just be because of the
diff and comments
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]