squito commented on a change in pull request #23677: [SPARK-26755][SCHEDULER] : 
Optimize Spark Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677#discussion_r303610213
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
 ##########
 @@ -1655,4 +1657,70 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // get removed inside TaskSchedulerImpl later.
     assert(availableResources(GPU) sameElements Array("0", "1", "2", "3"))
   }
+
+  test("SPARK-26755 Ensure that a speculative task is submitted only once for 
execution") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start
+    for ((k, v) <- List(
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec2" -> "host2",
+      "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(k, v, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === k)
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need 
to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
 
 Review comment:
   Lets expand this comment a bit to explain that we're about to start the main 
point of this test, something like.
   
   "We offer more resources, and ensure that speculative tasks get scheduled 
appropriately -- one extra copy per speculatable task, and still obeying the 
original locality preferences"
   
   
   and writing that extended description makes me realize we are actually *not* 
checking locality that preferences are obeyed for speculative tasks in this 
test.  To do that, I think you need to expand this a bit:
   
   * the tasks should have locality preferences, say hosts 1 & 3 for task 1, 
and host 2 for task 2
   * add a exec4 / host4 (with no task that prefers that host)
   * after you've got speculatable tasks, first make an offer with max locality 
NODE_LOCAL but on exec4, check it does *not* get scheduled
   * make an offer on exec2, still with NODE_LOCAL preferences -- task does 
*not* get scheduled because of "same host" exclusion for speculative tasks
   * make an offer on exec3, still with NODE_LOCAL -- task *does* get 
scheduled, because its a different host than running task1 and falls within 
locality prefs
   * make an offer on exec4 with ANY max locality, and task2 should get a 
speculative copy scheduled there.
   * make more offers, nothing scheduled (because copiesRunning == 2 for all 
tasks already)
   * another call to checkSpeculatable, and more offers, still nothing scheduled

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to