Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/16376#discussion_r95446452
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -819,4 +819,84 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
}
+
+ test("Locality should be used for bulk offers even with delay scheduling
off") {
+ // for testing, we create a task scheduler which lets us control how
offers are shuffled
+ val conf = new SparkConf()
+ .set("spark.locality.wait", "0")
+ sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+ // we create a manual clock just so we can be sure the clock doesn't
advance at all in this test
+ val clock = new ManualClock()
+
+ // We customize the task scheduler just to let us control the way
offers are shuffled, so we
+ // can be sure we try both permutations, and to control the clock on
the tasksetmanager.
+ val taskScheduler = new TaskSchedulerImpl(sc) {
+ override def shuffleOffers(offers: IndexedSeq[WorkerOffer]):
IndexedSeq[WorkerOffer] = {
+ // Don't shuffle the offers around for this test. Instead, we'll
just pass in all
+ // the permutations we care about directly.
+ offers
+ }
+ override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures:
Int): TaskSetManager = {
+ new TaskSetManager(this, taskSet, maxTaskFailures,
blacklistTrackerOpt, clock)
+ }
+ }
+ // Need to initialize a DAGScheduler for the taskScheduler to use for
callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+ taskScheduler.initialize(new FakeSchedulerBackend)
+
+
+ // Make two different offers -- one in the preferred location, one
that is not.
+ val offers = IndexedSeq(
+ WorkerOffer("exec1", "host1", 1),
+ WorkerOffer("exec2", "host2", 1)
+ )
+ Seq(false, true).foreach { swapOrder =>
+ // Submit a taskset with locality preferences.
+ val taskSet = FakeTask.createTaskSet(
+ 1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1",
"exec1")))
+ taskScheduler.submitTasks(taskSet)
+ val shuffledOffers = if (swapOrder) offers.reverse else offers
+ // Regardless of the order of the offers (after the task scheduler
shuffles them), we should
+ // always take advantage of the local offer.
+ val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten
+ withClue(s"swapOrder = $swapOrder") {
+ assert(taskDescs.size === 1)
+ assert(taskDescs.head.executorId === "exec1")
+ }
+ }
+ }
+
+ test("With delay scheduling off, tasks can be run at any locality level
immediately") {
--- End diff --
yes, you are absolutely right. I've updated and also added a check to make
sure tsm includes lower locality levels.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]