Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/13234#discussion_r64698507
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -249,10 +249,16 @@ private[spark] class TaskSchedulerImpl(
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
+ // TODO unit test, and also add executor-stage filtering as well
+ // This is an optimization -- the taskSet might contain a very long
list of pending tasks.
+ // Rather than wasting time checking the offer against each task, and
then realizing the
+ // executor is blacklisted, just filter out the bad executor
immediately.
+ val nodeBlacklist =
taskSet.blacklistTracker.map{_.nodeBlacklistForStage(taskSet.stageId)}
+ .getOrElse(Set())
--- End diff --
Before this change, there is an `O(n^2)` (where `n` is the number of
pending tasks) cost when you've got one bad executor. The tasks assigned to
the bad executor fail, but then we get another resource offer for the bad
executor again. So we find another task for the bad executor, it fails, and we
continue the process, going through all of the pending task. Each time we
respond to the resource offer, we need to (a) iterate through the list of tasks
to find one that is *not* blacklisted and (b) then remove it from the task
list. Those are both `O(1)` operations when there isn't any blacklisting -- we
just pop the last task off the stack. But as our bad executor makes its way
through the tasks, it has to go deeper into the list each time, and both
searching the list and then removing an element from it become expensive.
After we've gone through *all* of the tasks for bad executor once, then we
will wait for there to be resource offers from good executors. However, even
though we then start scheduling on the good executor, scheduling as a whole is
still much slower, because we still have an `O(n)` cost at each call to
resourceOffer. The offer still includes the (now idle) bad executor, and we
have to iterate through the entire list of pending tasks to decide that nope,
there aren't any tasks we can schedule on that node.
In my performance tests with a 3k task job, this leads to about a 10x
slowdown, but obviously this depends a lot on the number of tasks. But that is
the really scary thing -- its not a function of how many bad nodes you have,
but how many tasks you are trying to run. So on a large cluster, where a bad
node is more likely, and lots of tasks are more likely, the slowdown will be
much worse.
Note that as implemented in this version of the patch, this slowdown is
only avoided when we blacklist the entire node. But we should add blacklisting
for an executor as well, to avoid the slowdown in that case also.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]