attilapiros commented on a change in pull request #24245: 
[SPARK-13704][CORE][YARN] Reduce rack resolution time
URL: https://github.com/apache/spark/pull/24245#discussion_r270942556
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -1602,4 +1612,40 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, 
result.value(),
       result.accumUpdates, info3)
   }
+
+  test("SPARK-13704 Rack Resolution is done with a batch of de-duped hosts") {
+    val conf = new SparkConf()
+      .set(config.LOCALITY_WAIT.key, "0")
+      .set(config.LOCALITY_WAIT_RACK.key, "1s")
+    sc = new SparkContext("local", "test", conf)
+    // Create a cluster with 20 racks, with hosts spread out among them
+    val execAndHost = (0 to 199).map { i =>
+      FakeRackUtil.assignHostToRack("host" + i, "rack" + (i % 20))
+      ("exec" + i, "host" + i)
+    }
+    sched = new FakeTaskScheduler(sc, execAndHost: _*)
+    // make a taskset with preferred locations on the first 100 hosts in our 
cluster
+    val locations = new ArrayBuffer[Seq[TaskLocation]]()
+    for (i <- 0 to 99) {
+      locations += Seq(TaskLocation("host" + i))
+    }
+    val taskSet = FakeTask.createTaskSet(100, locations: _*)
+    val clock = new ManualClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    // with rack locality, reject an offer on a host with an unknown rack
+    assert(manager.resourceOffer("otherExec", "otherHost", 
TaskLocality.RACK_LOCAL).isEmpty)
+    (0 until 20).foreach { rackIdx =>
+      (0 until 5).foreach { offerIdx =>
+        // if we offer hosts which are not in preferred locations,
+        // we'll reject them at NODE_LOCAL level,
+        // but accept them at RACK_LOCAL level if they're on OK racks
+        val hostIdx = 100 + rackIdx
+        assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, 
TaskLocality.NODE_LOCAL)
+          .isEmpty)
+        assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, 
TaskLocality.RACK_LOCAL)
+          .isDefined)
+      }
+    }
+    assert(FakeRackUtil.numBatchInvocation === 1)
 
 Review comment:
   As `addPendingTasks()` is called during the construction of `TaskSetManager` 
instance (and that is the only one place where multiple hosts can be passed to 
`getRacksForHosts()`) I did not get why `numBatchInvocation === 1` is so 
important that it is emphasised by this assert. 
   
   I assume we are thinking about any potential future code which would use the 
`getRacksForHosts()` (meanwhile it is an expensive call), am I right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to