mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r897072986


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)

Review Comment:
   nit: 
   Do we want to do `set*` here instead of `inc*` ?
   (We would need to add it to `inputMetrics` though)



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0, 10000, 50000).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3), 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated
+      // by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s, the task 4 
runtime(20s) is
+      // above (10s) and evaluated an inefficient task to speculate.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 4 
runtime(20s) is
+      // less than (50s) and no needs to speculate.
+      if (Seq(0, 10000).contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_TASK_PROGRESS_MULTIPLIER for speculating 
" +
+    "inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0.5, 0.8).foreach { processMultiplier => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key,
+        processMultiplier.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3))
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 0.5 < 0.6 < 0.8
+      if (processMultiplier == 0.8) {
+        assert(manager.checkSpeculatableTasks(100))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(100))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_TASK_DURATION_FACTOR for speculating" +
+    " tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(1, 2).foreach { factor => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
factor.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3))
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (factor == 1) {
+        assert(manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        // runtimeMs(20s) < 30s(2 * 15s)
+        assert(!manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+    }

Review Comment:
   Fix indentation ? (should be fine to move it into prev line - here and in 
prev test)



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0, 10000, 50000).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3), 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated
+      // by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s, the task 4 
runtime(20s) is
+      // above (10s) and evaluated an inefficient task to speculate.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 4 
runtime(20s) is
+      // less than (50s) and no needs to speculate.
+      if (Seq(0, 10000).contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_TASK_PROGRESS_MULTIPLIER for speculating 
" +
+    "inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0.5, 0.8).foreach { processMultiplier => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key,
+        processMultiplier.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3))

Review Comment:
   super nit: Explicitly pass `inefficientMultiplier` here ? Since we are 
depending on that value below ? Makes the test more clear.



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately

Review Comment:
   nit: 
   `so speculative tasks are launched immediately` -> `so speculative tasks are 
launched based on minTimeToSpeculation parameter to checkSpeculatableTasks` 
(here and in other tests)



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")

Review Comment:
   QQ: Do we need `SPECULATION_MULTIPLIER` and 
`SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER` for this test ?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0, 10000, 50000).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3), 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated
+      // by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s, the task 4 
runtime(20s) is
+      // above (10s) and evaluated an inefficient task to speculate.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 4 
runtime(20s) is
+      // less than (50s) and no needs to speculate.
+      if (Seq(0, 10000).contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_TASK_PROGRESS_MULTIPLIER for speculating 
" +

Review Comment:
   Rename all tests to reflect the modified config names ? 



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0, 10000, 50000).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Seq(3), 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated
+      // by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s, the task 4 
runtime(20s) is
+      // above (10s) and evaluated an inefficient task to speculate.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 4 
runtime(20s) is
+      // less than (50s) and no needs to speculate.
+      if (Seq(0, 10000).contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_TASK_PROGRESS_MULTIPLIER for speculating 
" +
+    "inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)

Review Comment:
   nit: You can directly use the config entry and avoid needing to pass the 
values as string's.
   
   For example, `.set(config.SPECULATION_MULTIPLIER.key, "0.0")` -> 
`.set(config.SPECULATION_MULTIPLIER, 0.0)`
   
   (Here and elsewhere)



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.incRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.incRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.incRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched immediately
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER.key, "0.0")
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key, "0.5")
+      .set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key, 
Int.MaxValue.toString)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0, 10000, 50000).foreach { minDuration =>

Review Comment:
   nit: Thoughts on making this something like
   
   ```
   val speculativeDurations = Set(0, 10000)
   val nonSpeculativeDurations = Set(50000)
   
   (speculativeDurations ++ nonSpeculativeDurations).foreach { minDuration =>
   
   ...
   
   // if (Seq(0, 10000).contains(minDuration)) {
   if (speculativeDurations.contains(minDuration)) {
   ...
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,220 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Seq[Int],

Review Comment:
   ```suggestion
          inefficientTaskIds: Set[Int],
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to