Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r902100661


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1218,6 +1248,70 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is 
inefficient when its data
+   * process rate is less than the average data process rate of all successful 
tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = 0.0D
+    private val runingTasksProcessRate = new ConcurrentHashMap[Long, Double]()

Review Comment:
   ```suggestion
       private val runningTasksProcessRate = new ConcurrentHashMap[Long, 
Double]()
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1218,6 +1248,70 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is 
inefficient when its data
+   * process rate is less than the average data process rate of all successful 
tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = 0.0D
+    private val runingTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead, 
totalExecutorRunTime)
+      runingTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRuningTaskProcessRate(
+        taskId: Long,
+        taskProcessRate: Double): Unit = {
+      runingTasksProcessRate.put(taskId, taskProcessRate)
+    }
+
+    private[TaskSetManager] def isInefficient(

Review Comment:
   Shall we only return the process rate here? And we can define another 
function (maybe inside`checkAndSubmitSpeculatableTasks`) like:
   
   ```scala
   def isInefficient(taskProcessRate: Double): Boolean = {
     (runtimeMs > efficientTaskDurationFactor * threshold) || (taskProcessRate 
< taskProcessRateCalculator.avgTaskProcessRate * multiplier)
   }
   
   ```
   
   



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,216 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Set[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.setRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.setRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.setRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.setRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    val conf = new SparkConf()
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    val speculativeDurations = Set(0, 10000)
+    val nonSpeculativeDurations = Set(50000)
+    (speculativeDurations ++ nonSpeculativeDurations).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), 
inefficientMultiplier = 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated
+      // by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s, the task 4 
runtime(20s) is
+      // above (10s) and evaluated an inefficient task to speculate.

Review Comment:
   Could you also test the case where the task runtime has exceed the 
minDuration but isn't specualted as it's efficiency?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2245,6 +2250,216 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Set[Int],
+       inefficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        metrics.inputMetrics.setRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+        metrics.shuffleReadMetrics.setRecordsRead((inefficientMultiplier * 
RECORDS_NUM).toLong)
+      } else {
+        metrics.inputMetrics.setRecordsRead(RECORDS_NUM)
+        metrics.shuffleReadMetrics.setRecordsRead(RECORDS_NUM)
+      }
+      metrics.setExecutorRunTime(RUNTIME)
+      metrics
+    }
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    val conf = new SparkConf()
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating 
inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are 
launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    val speculativeDurations = Set(0, 10000)
+    val nonSpeculativeDurations = Set(50000)
+    (speculativeDurations ++ nonSpeculativeDurations).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), 
inefficientMultiplier = 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        
ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, 
taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, 
taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, 
serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, the task 4 will be 
speculated

Review Comment:
   Could you replace "task 4" with "task 3" so it's consistent with the 
assertion below?



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")

Review Comment:
   "When set to true, spark will evaluate task data process efficiency by task 
metrics and will only speculate the tasks which have exceeded the time 
threshold and are inefficient at the same time. A task will be considered as 
inefficient when its data process rate is less than the average data process 
rate of all the successful tasks in the same stage attempt * 
${SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER.key}." 



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1218,6 +1248,70 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is 
inefficient when its data
+   * process rate is less than the average data process rate of all successful 
tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = 0.0D
+    private val runingTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead, 
totalExecutorRunTime)
+      runingTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRuningTaskProcessRate(

Review Comment:
   ```suggestion
       private[scheduler] def updateRunningTaskProcessRate(
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")
+      .doc("A multiplier for evaluating the efficiency of task processing. A 
task is inefficient " +
+        "when its data process rate is less than the average data process rate 
of all " +
+        "successful tasks in the stage multiplied by the multiplier.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.durationFactor")
+      .doc(s"When a task duration is large than the factor multiplied by the 
threshold which " +
+        s"may be ${SPECULATION_MULTIPLIER.key} * 
successfulTaskDurations.median or " +
+        s"${SPECULATION_MIN_THRESHOLD.key}, and it should be considered for " +
+        s"speculation to avoid that it is too late to launch a necessary 
speculation.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(_ >= 1.0, "Duration factor must be >= 1.0")
+      .createWithDefault(2.0)

Review Comment:
   Add to `docs/configuration.md`?



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")
+      .doc("A multiplier for evaluating the efficiency of task processing. A 
task is inefficient " +
+        "when its data process rate is less than the average data process rate 
of all " +
+        "successful tasks in the stage multiplied by the multiplier.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.durationFactor")
+      .doc(s"When a task duration is large than the factor multiplied by the 
threshold which " +
+        s"may be ${SPECULATION_MULTIPLIER.key} * 
successfulTaskDurations.median or " +
+        s"${SPECULATION_MIN_THRESHOLD.key}, and it should be considered for " +
+        s"speculation to avoid that it is too late to launch a necessary 
speculation.")

Review Comment:
   "A task will be speculated anyway as long as its duration has exceeded the 
value of multiplying the factor and the time threshold (either be 
{SPECULATION_MULTIPLIER.key} \* successfulTaskDurations.median or 
{SPECULATION_MIN_THRESHOLD.key})" regardless of it's efficient or not. This 
avoids missing the tasks when  task slow isn't due to data process rate."



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")
+      .doc("A multiplier for evaluating the efficiency of task processing. A 
task is inefficient " +
+        "when its data process rate is less than the average data process rate 
of all " +
+        "successful tasks in the stage multiplied by the multiplier.")

Review Comment:
   "A multiplier that used when evaluating inefficient tasks. The higher the 
multiplier is, the more tasks will be possibly considered as inefficient."



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1218,6 +1248,70 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is 
inefficient when its data
+   * process rate is less than the average data process rate of all successful 
tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = 0.0D
+    private val runingTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead, 
totalExecutorRunTime)
+      runingTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRuningTaskProcessRate(
+        taskId: Long,
+        taskProcessRate: Double): Unit = {
+      runingTasksProcessRate.put(taskId, taskProcessRate)
+    }
+
+    private[TaskSetManager] def isInefficient(
+        tid: Long,
+        runtimeMs: Long,
+        taskInfo: TaskInfo): Boolean = {
+      // Only check inefficient tasks when avgTaskProcessRate > 0, because 
some stage
+      // tasks may have neither input records nor shuffleRead records, so the
+      // avgTaskProcessRate may be zero all the time, this case we should make 
sure
+      // it can be speculated. eg: some spark-sql like that 'msck repair 
table' or 'drop table'
+      // and so on.
+      if (avgTaskProcessRate <= 0.0) return true
+      val currentTaskProcessRate = runingTasksProcessRate.getOrDefault(tid, 
0.0)
+      if (currentTaskProcessRate <= 0.0) {
+        true
+      } else {
+        val taskProcessThreshold = avgTaskProcessRate * 
efficientTaskProcessMultiplier
+        val isInefficientTask = currentTaskProcessRate < taskProcessThreshold
+        if (isInefficientTask) {
+          logInfo(s"Marking task ${taskInfo.index} in stage ${taskSet.id} " +
+            s"(on ${taskInfo.host}) as inenfficient because it ran 
${runtimeMs}ms and " +

Review Comment:
   ```suggestion
               s"(on ${taskInfo.host}) as inefficient because it ran 
${runtimeMs}ms and " +
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")
+      .doc("A multiplier for evaluating the efficiency of task processing. A 
task is inefficient " +
+        "when its data process rate is less than the average data process rate 
of all " +
+        "successful tasks in the stage multiplied by the multiplier.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.durationFactor")

Review Comment:
   ```suggestion
       ConfigBuilder("spark.speculation.efficiency.longRunTaskFactor")
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2073,6 +2073,37 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc("When set to true, spark will evaluate the efficiency of task 
processing through the " +
+        "stage task metrics and only need to speculate the inefficient tasks. 
A task is " +
+        "inefficient when its data process rate is less than the average data 
process " +
+        "rate of all successful tasks in the stage multiplied by a 
multiplier.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processMultiplier")

Review Comment:
   ```suggestion
       ConfigBuilder("spark.speculation.efficiency.processRateMultiplier")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to