bmarcott commented on a change in pull request #27207: [WIP][SPARK-18886][CORE] 
Make Locality wait time measure resource under utilization due to delay 
scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r375064234
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##########
 @@ -195,6 +195,191 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+    val taskScheduler = new TaskSchedulerImpl(sc,
+      sc.conf.get(config.TASK_MAX_FAILURES),
+      clock = clock) {
+      override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+        new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+      }
+      override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+        // Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+        // the permutations we care about directly.
+        offers
+      }
+    }
+    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+    new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+      override def executorAdded(execId: String, host: String): Unit = {}
+    }
+    taskScheduler.initialize(new FakeSchedulerBackend)
+    val taskSet = FakeTask.createTaskSet(8, 1, 1,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1"))
+    )
+    taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+    taskScheduler.submitTasks(taskSet)
+    taskScheduler
+  }
+
+  test("SPARK-18886 -  partial offers (isAllFreeResources = false) reset timer 
before " +
+    "any resources have been rejected") {
+    val clock = new ManualClock()
+    val taskScheduler = setupTaskScheduler(clock)
+    val advanceAmount = 2000
+
+    // by default, new partial resource (isAllFreeResources = false) offers 
reset timer
+    // if the resource is accepted
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    // would advance to NODE_LOCAL locality if timer wasn't reset above
+    // Verifying this node local task is not accepted
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)))
+      .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 -  delay scheduling timer is reset when it accepts all 
resources offered when" +
+    "isAllFreeResources = true") {
+    val clock = new ManualClock()
+    val taskScheduler = setupTaskScheduler(clock)
+    val advanceAmount = 2000
+
+    // timer is reset when tsm accepts all free resources offered to it
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = true)
+      .flatten.length === 1)
+
+    // would advance to NODE_LOCAL locality if timer wasn't reset above
+    // Verifying this node local task is not accepted
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)))
+      .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - partial resource offers (isAllFreeResources = false) 
reset " +
+    "time if last full resource offer (isAllResources = true) was accepted as 
well as any " +
+    "following partial resource offers") {
+    val clock = new ManualClock()
+    val taskScheduler = setupTaskScheduler(clock)
+    val advanceAmount = 2000
+
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = true)
+      .flatten.length === 1)
+
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    clock.advance(advanceAmount)
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)))
 
 Review comment:
   👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to