Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/15644#discussion_r88613129
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -437,62 +438,76 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
new WorkerOffer("executor3", "host2", 1)
)
// We should check the node & exec blacklists, but only O(numOffers),
not O(numPendingTasks)
- // times. Its O(numOffers), not exactly numOffers, because we offer
resources multiple
- // times. An upper bound on the worst case is -- we check it once for
every core at each
- // locality level. (We could tighten the bound a bit but that should
be a good enough check.)
+ // times. In the worst case, after shuffling, we offer our
blacklisted resource first, and then
+ // offer other resources which do get used. The taskset blacklist is
consulted repeatedly as
+ // we offer resources to the taskset -- each iteration either
schedules something, or it
+ // terminates that locality level, so the maximum number of checks is
+ // numCores + numLocalityLevels
val numCoresOnAllOffers = offers.map(_.cores).sum
val numLocalityLevels = TaskLocality.values.size
- val maxBlacklistChecks = numCoresOnAllOffers * numLocalityLevels
-
- // Setup the blacklist, and get back a list of the executors & nodes
that have any blacklisting
- // (even implicit blacklisting).
+ val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
+ // Setup the blacklist
nodeBlacklist.foreach { node =>
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
}
execBlacklist.foreach { exec =>
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec)).thenReturn(true)
}
+
+ // Figure out which nodes have any effective blacklisting on them.
This means all nodes that
+ // are explicitly blacklisted, plus those that have *any* executors
blacklisted.
val nodesForBlacklistedExecutors = offers.filter { offer =>
execBlacklist.contains(offer.executorId)
}.map(_.host).toSet.toSeq
- val nodesToCheck = nodeBlacklist ++ nodesForBlacklistedExecutors
+ val nodesWithAnyBlacklisting = nodeBlacklist ++
nodesForBlacklistedExecutors
+ // Similarly, figure out which executors have any blacklisting. This
means all executors that
+ // are explicitly blacklisted, plus all executors on nodes that are
blacklisted.
val execsForBlacklistedNodes = offers.filter { offer =>
nodeBlacklist.contains(offer.host)
}.map(_.executorId).toSeq
- val executorsToCheck = execBlacklist ++ execsForBlacklistedNodes
+ val executorsWithAnyBlacklisting = execBlacklist ++
execsForBlacklistedNodes
- // Schedule a taskset, do a bit of basic sanity checking (that our
test is operating the way its
- // supposed to).
+ // Schedule a taskset, and make sure our test setup is correct -- we
are able to schedule
+ // a task on all executors that aren't blacklisted (even the ones
implicitly blacklisted by the
+ // node blacklist).
val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
- assert(firstTaskAttempts.size === 3 - executorsToCheck.size)
- assert(firstTaskAttempts.size < offers.size)
+ assert(firstTaskAttempts.size === 3 -
executorsWithAnyBlacklisting.size)
--- End diff --
oh I finally understand the 3 here -- can you do "offers.size" instead?
That seems clearer to me (Even though I realize an offer could have multiple
cores -- so that only works since cores = 1 for all offers)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]