Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/15644#discussion_r88615462
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -437,62 +438,76 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
new WorkerOffer("executor3", "host2", 1)
)
// We should check the node & exec blacklists, but only O(numOffers),
not O(numPendingTasks)
- // times. Its O(numOffers), not exactly numOffers, because we offer
resources multiple
- // times. An upper bound on the worst case is -- we check it once for
every core at each
- // locality level. (We could tighten the bound a bit but that should
be a good enough check.)
+ // times. In the worst case, after shuffling, we offer our
blacklisted resource first, and then
+ // offer other resources which do get used. The taskset blacklist is
consulted repeatedly as
+ // we offer resources to the taskset -- each iteration either
schedules something, or it
+ // terminates that locality level, so the maximum number of checks is
+ // numCores + numLocalityLevels
val numCoresOnAllOffers = offers.map(_.cores).sum
val numLocalityLevels = TaskLocality.values.size
- val maxBlacklistChecks = numCoresOnAllOffers * numLocalityLevels
-
- // Setup the blacklist, and get back a list of the executors & nodes
that have any blacklisting
- // (even implicit blacklisting).
+ val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
+ // Setup the blacklist
nodeBlacklist.foreach { node =>
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
}
execBlacklist.foreach { exec =>
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec)).thenReturn(true)
}
+
+ // Figure out which nodes have any effective blacklisting on them.
This means all nodes that
+ // are explicitly blacklisted, plus those that have *any* executors
blacklisted.
val nodesForBlacklistedExecutors = offers.filter { offer =>
execBlacklist.contains(offer.executorId)
}.map(_.host).toSet.toSeq
- val nodesToCheck = nodeBlacklist ++ nodesForBlacklistedExecutors
+ val nodesWithAnyBlacklisting = nodeBlacklist ++
nodesForBlacklistedExecutors
+ // Similarly, figure out which executors have any blacklisting. This
means all executors that
+ // are explicitly blacklisted, plus all executors on nodes that are
blacklisted.
val execsForBlacklistedNodes = offers.filter { offer =>
nodeBlacklist.contains(offer.host)
}.map(_.executorId).toSeq
- val executorsToCheck = execBlacklist ++ execsForBlacklistedNodes
+ val executorsWithAnyBlacklisting = execBlacklist ++
execsForBlacklistedNodes
- // Schedule a taskset, do a bit of basic sanity checking (that our
test is operating the way its
- // supposed to).
+ // Schedule a taskset, and make sure our test setup is correct -- we
are able to schedule
+ // a task on all executors that aren't blacklisted (even the ones
implicitly blacklisted by the
+ // node blacklist).
val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
- assert(firstTaskAttempts.size === 3 - executorsToCheck.size)
- assert(firstTaskAttempts.size < offers.size)
+ assert(firstTaskAttempts.size === 3 -
executorsWithAnyBlacklisting.size)
// Now check that we haven't made too many calls to any of the
blacklist methods.
+ // We should be checking our node blacklist, but it should be within
the bound we defined above.
verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
.isNodeBlacklistedForTaskSet(anyString())
- nodesToCheck.foreach { node =>
+ // We shouldn't ever consult the per-task blacklist for the nodes that
have been blacklisted
+ // for the entire taskset, since the taskset level blacklisting should
prevent scheduling
+ // from ever looking at specific tasks.
+ nodesWithAnyBlacklisting.foreach { node =>
verify(stageToMockTaskSetBlacklist(0), never)
.isNodeBlacklistedForTask(meq(node), anyInt())
}
- executorsToCheck.foreach { exec =>
- // If we had a node-blacklist, then we could tighten the next check
to *never*. But, it also
- // doesn't particular matter if we the executor check happens in
addition.
+ executorsWithAnyBlacklisting.foreach { exec =>
+ // We should be checking our executor blacklist, but it should be
within the bound defined
+ // above. Its possible that this will be significantly fewer calls,
maybe even 0, if there
+ // is also a node-blacklist which takes effect first. But this
assert is all we need to
+ // avoid an O(numPendingTask) slowdown.
--- End diff --
actually with 3 offers, there are only 6 permutations, so 100 seems like
overkill. Maybe 10?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]