bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402739744
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##########
 @@ -196,6 +196,241 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+    val taskScheduler = new TaskSchedulerImpl(sc,
+      sc.conf.get(config.TASK_MAX_FAILURES),
+      clock = clock) {
+      override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+        new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+      }
+      override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+        // Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+        // the permutations we care about directly.
+        offers
+      }
+    }
+    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+    new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+      override def executorAdded(execId: String, host: String): Unit = {}
+    }
+    taskScheduler.initialize(new FakeSchedulerBackend)
+    val taskSet = FakeTask.createTaskSet(8, 1, 1,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1"))
+    )
+
+    // Offer resources first so that when the taskset is submitted it can 
initialize
+    // with proper locality level. Otherwise, ANY would be the only locality 
level.
+    // See TaskSetManager.computeValidLocalityLevels()
+    // This begins the task set as PROCESS_LOCAL locality level
+    taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+    taskScheduler.submitTasks(taskSet)
+    taskScheduler
+  }
+
+  test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer 
before " +
+    "any resources have been rejected") {
+    val clock = new ManualClock()
+    // All tasks created here are local to exec1, host1.
+    // Locality level starts at PROCESS_LOCAL.
+    val taskScheduler = setupTaskScheduler(clock)
+    // Locality levels increase at 3000 ms.
+    val advanceAmount = 3000
+
+    // Advancing clock increases locality level to NODE_LOCAL.
+    clock.advance(advanceAmount)
+
+    // If there hasn't yet been any full resource offers,
+    // partial resource (isAllFreeResources = false) offers reset delay 
scheduling
+    // if this and previous offers were accepted.
+    // This line resets the timer and locality level is reset to PROCESS_LOCAL.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    // This NODE_LOCAL task should not be accepted.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - delay scheduling timer is reset when it accepts all 
resources offered when" +
+    "isAllFreeResources = true") {
+    val clock = new ManualClock()
+    // All tasks created here are local to exec1, host1.
+    // Locality level starts at PROCESS_LOCAL.
+    val taskScheduler = setupTaskScheduler(clock)
+    // Locality levels increase at 3000 ms.
+    val advanceAmount = 3000
+
+    // Advancing clock increases locality level to NODE_LOCAL.
+    clock.advance(advanceAmount)
+
+    // If there are no rejects on an all resource offer, delay scheduling is 
reset.
+    // This line resets the timer and locality level is reset to PROCESS_LOCAL.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = true)
+      .flatten.length === 1)
+
+    // This NODE_LOCAL task should not be accepted.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - partial resource offers (isAllFreeResources = false) 
reset " +
+    "time if last full resource offer (isAllResources = true) was accepted as 
well as any " +
+    "following partial resource offers") {
+    val clock = new ManualClock()
+    // All tasks created here are local to exec1, host1.
+    // Locality level starts at PROCESS_LOCAL.
+    val taskScheduler = setupTaskScheduler(clock)
+    // Locality levels increase at 3000 ms.
+    val advanceAmount = 3000
+
+    // PROCESS_LOCAL full resource offer is accepted.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = true)
+      .flatten.length === 1)
+
+    // Advancing clock increases locality level to NODE_LOCAL.
+    clock.advance(advanceAmount)
+
+    // PROCESS_LOCAL partial resource is accepted.
+    // Since all offers have been accepted since the last full resource offer
+    // (this one and the previous one), delay scheduling is reset.
+    // This line resets the timer and locality level is reset to PROCESS_LOCAL.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    // Advancing clock increases locality level to NODE_LOCAL
+    clock.advance(advanceAmount)
+
+    // PROCESS_LOCAL partial resource is accepted
+    // Since all offers have been accepted since the last full resource offer
+    // (one previous full offer, one previous partial offer, and this partial 
offer),
+    // delay scheduling is reset.
+    // This line resets the timer and locality level is reset to PROCESS_LOCAL.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.length === 1)
+
+    // This NODE_LOCAL task should not be accepted.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten.isEmpty)
+  }
+
 
 Review comment:
   👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to