mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r861487119


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -80,12 +82,22 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  val speculationTaskMinDuration = conf.get(SPECULATION_TASK_MIN_DURATION)
+  val speculationTaskProgressMultiplier = 
conf.get(SPECULATION_TASK_PROGRESS_MULTIPLIER)
+  val speculationTaskDurationFactor = 
conf.get(SPECULATION_TASK_DURATION_FACTOR)
+  val speculationSingleTaskDurationThreshold = 
conf.get(SPECULATION_SINGLE_TASK_DURATION_THRESHOLD)
+
+  val speculationTaskStatsCacheInterval = math.min(3600000, max(100,
+    conf.get(SPECULATION_TASK_STATS_CACHE_DURATION)))

Review Comment:
   Make all of these new variables private.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -109,6 +121,10 @@ private[spark] class TaskSetManager(
   private val executorDecommissionKillInterval =
     
conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(TimeUnit.SECONDS.toMillis)
 
+  private[scheduler] val inefficientTask = new InefficientTask(
+    recomputeIntervalMs = speculationTaskStatsCacheInterval,
+    maxTasksToCheck = max(4096, minFinishedForSpeculation))

Review Comment:
   Why `4096` ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -80,12 +82,22 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  val speculationTaskMinDuration = conf.get(SPECULATION_TASK_MIN_DURATION)
+  val speculationTaskProgressMultiplier = 
conf.get(SPECULATION_TASK_PROGRESS_MULTIPLIER)
+  val speculationTaskDurationFactor = 
conf.get(SPECULATION_TASK_DURATION_FACTOR)
+  val speculationSingleTaskDurationThreshold = 
conf.get(SPECULATION_SINGLE_TASK_DURATION_THRESHOLD)
+
+  val speculationTaskStatsCacheInterval = math.min(3600000, max(100,
+    conf.get(SPECULATION_TASK_STATS_CACHE_DURATION)))
+
   // Quantile of tasks at which to start speculation
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * 
numTasks).floor.toInt, 1)
   // User provided threshold for speculation regardless of whether the 
quantile has been reached
   val speculationTaskDurationThresOpt = 
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  val isSpeculationThresholdSpecified = 
speculationTaskDurationThresOpt.isDefined &&
+    speculationTaskDurationThresOpt.get > 0

Review Comment:
   nit: `speculationTaskDurationThresOpt.exists(_ > 0)`



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,45 +1164,48 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
-    if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
-      val medianDuration = successfulTaskDurations.median
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
-      // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
+    val timeMs = clock.getTimeMillis()
+    if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {
+      val threshold = if (numSuccessfulTasks <= 0) {
+        if (isSpeculationThresholdSpecified) {
+          max(speculationTaskDurationThresOpt.get, 
speculationSingleTaskDurationThreshold)
+        } else {
+          speculationSingleTaskDurationThreshold
         }
-        foundTasks |= speculated
+      } else {
+        val medianDuration = successfulTaskDurations.median
+        max(speculationMultiplier * medianDuration, minTimeToSpeculation)
       }
-    } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
-      val time = clock.getTimeMillis()
+      // bound based on that.
+      logDebug("Task length threshold for speculation: " + threshold)
+      foundTasks = checkAndSubmitSpeculatableTask(timeMs, threshold, 
numSuccessfulTasks)
+    } else if (isSpeculationThresholdSpecified && 
speculationTasksLessEqToSlots) {
       val threshold = speculationTaskDurationThresOpt.get
       logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
-      for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      foundTasks = checkAndSubmitSpeculatableTask(timeMs, threshold, 
numSuccessfulTasks,
+        customizedThreshold = true)
+    }
+    // avoid more warning logs.
+    if (foundTasks) {
+      val elapsedMs = clock.getTimeMillis() - timeMs
+      if (elapsedMs > minTimeToSpeculation) {
+        logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms > 
${minTimeToSpeculation}ms")
       }
     }
     foundTasks
   }
 
+  private def maybeRecompute(currentTimeMillis: Long): Unit = {

Review Comment:
   Remove this method and move the logic into `inefficientTask.maybeRecompute` ?
   `appStatusStore` can be initialized as a field there



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,127 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+      val recomputeIntervalMs: Long,
+      val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(
+        appStatusStore: AppStatusStore,
+        nowMs: Long,
+        stageId: Int,
+        stageAttemptId: Int): Unit = {
+      if (!checkInefficientTask || appStatusStore == null) {
+        return
+      }
+      try {
+        this.synchronized {
+          if (!updateSealed &&
+            (lastComputeMs <= 0 || nowMs > lastComputeMs + 
recomputeIntervalMs)) {
+            val (progressRate, numSuccessTasks): (Double, Int) = 
computeSuccessTaskProgress(
+              stageId, stageAttemptId, appStatusStore)
+            if (progressRate > 0.0) {
+              setTaskData(stageId, stageAttemptId, appStatusStore)
+              taskProgressThreshold = progressRate * 
speculationTaskProgressMultiplier
+              if (numSuccessTasks > maxTasksToCheck) {
+                updateSealed = true
+              }
+              lastComputeMs = nowMs
+            }
+          }
+        }
+      } catch {
+        case e: InterruptedException => throw e
+        case t: Throwable => logWarning("Failed to recompute InefficientTask 
state", t)

Review Comment:
   Remove the try/catch (once dependency on app store is removed).



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(
-      tid: Long,
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    var recomputeInefficientTasks = true
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      val runtimeMs = info.timeRunning(currentTimeMillis)
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+
+        def checkMaySpeculate: Boolean = if (customizedThreshold) {
+          true
+        } else {
+          if (recomputeInefficientTasks && numSuccessfulTasks > 0 && sched.sc 
!= null) {
+            this.maybeRecompute(currentTimeMillis)
+            recomputeInefficientTasks = false
+          }
+          val longTimeTask = (numTasks <= 1) ||
+            (runtimeMs > speculationTaskDurationFactor *
+              max(speculationTaskMinDuration, threshold))
+          longTimeTask || inefficientTask.maySpeculateTask(tid, runtimeMs, 
info)
+        }
+
+        val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate

Review Comment:
   As currently formulated, this is going to result in always speculating 
stages with low number of tasks (specifically numTasks == 1 right now). Why ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(
-      tid: Long,
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {

Review Comment:
   Rename `customizedThreshold`.
   Probably `enforceThreshold` or some such, so that it conveys what the intent 
of the boolean is.
   



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(
-      tid: Long,
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    var recomputeInefficientTasks = true
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      val runtimeMs = info.timeRunning(currentTimeMillis)
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+
+        def checkMaySpeculate: Boolean = if (customizedThreshold) {

Review Comment:
   Use braces to surround the method.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(
-      tid: Long,
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    var recomputeInefficientTasks = true
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      val runtimeMs = info.timeRunning(currentTimeMillis)

Review Comment:
   nit: Move this into the `if` block below



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(
-      tid: Long,
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && 
!speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran 
more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      numSuccessfulTasks: Int,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    var recomputeInefficientTasks = true
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      val runtimeMs = info.timeRunning(currentTimeMillis)
+      if (!successful(index) && copiesRunning(index) == 1 && 
!speculatableTasks.contains(index)) {
+
+        def checkMaySpeculate: Boolean = if (customizedThreshold) {
+          true
+        } else {
+          if (recomputeInefficientTasks && numSuccessfulTasks > 0 && sched.sc 
!= null) {

Review Comment:
   Why `sched.sc != null` check ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,45 +1164,48 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
-    if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
-      val medianDuration = successfulTaskDurations.median
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
-      // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
+    val timeMs = clock.getTimeMillis()
+    if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {

Review Comment:
   Why special case `numTasks == 1` ?
   If the intent is to catch cases where there are insufficient number of tasks 
to trigger speculation, check for `(speculationQuantile * numTasks) < 1` 
instead ?



##########
core/src/main/scala/org/apache/spark/SparkStatusTracker.scala:
##########
@@ -120,4 +120,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext, 
store: AppStatusStore
         exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
     }.toArray
   }
+
+  def getAppStatusStore: AppStatusStore = {
+    store
+  }

Review Comment:
   We dont need to use AppStatusStore for this PR, but can directly fetch the 
stage/task state from TSM.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,127 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+      val recomputeIntervalMs: Long,
+      val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(
+        appStatusStore: AppStatusStore,
+        nowMs: Long,
+        stageId: Int,
+        stageAttemptId: Int): Unit = {
+      if (!checkInefficientTask || appStatusStore == null) {
+        return
+      }
+      try {
+        this.synchronized {
+          if (!updateSealed &&
+            (lastComputeMs <= 0 || nowMs > lastComputeMs + 
recomputeIntervalMs)) {
+            val (progressRate, numSuccessTasks): (Double, Int) = 
computeSuccessTaskProgress(
+              stageId, stageAttemptId, appStatusStore)
+            if (progressRate > 0.0) {
+              setTaskData(stageId, stageAttemptId, appStatusStore)
+              taskProgressThreshold = progressRate * 
speculationTaskProgressMultiplier
+              if (numSuccessTasks > maxTasksToCheck) {
+                updateSealed = true
+              }
+              lastComputeMs = nowMs
+            }
+          }
+        }
+      } catch {
+        case e: InterruptedException => throw e
+        case t: Throwable => logWarning("Failed to recompute InefficientTask 
state", t)
+      }
+    }
+
+    private def setTaskData(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): Unit = {
+      try {
+        // The stage entity will be writen into appStatusStore by
+        // 'listener.onStageSubmitted' and updated by 
'listener.onExecutorMetricsUpdate',
+        // it's not going to be in appStatusStore when 
'TaskSetManager.checkSpeculatableTasks'
+        // comes before 'listener.onStageSubmitted' to write it, so we should 
catch that and
+        // fallback.
+        val stageData = appStatusStore.stageAttempt(stageId, stageAttemptId, 
true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      } catch {
+        case e: RuntimeException => logWarning("Failed to set taskData", e)
+      }
+    }
+
+    private def computeSuccessTaskProgress(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): (Double, Int) = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      var progressRate = 0.0
+      var numSuccessTasks = 0
+      try {
+        appStatusStore.taskList(stageId, stageAttemptId, 
maxTasksToCheck).filter {

Review Comment:
   This is potentially a very expensive call, particularly for large 
applications - there is no indexing in `InMemoryStore`.
   
   Instead, directly iterate over the tasks via `taskAttempts` and get to the 
relevant metrics ? Why go through `appStatusStore` ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1069,24 +1085,64 @@ private[spark] class TaskSetManager(
    * speculative run.
    */
   private def checkAndSubmitSpeculatableTask(

Review Comment:
   Rename method to `checkAndSubmitSpeculatableTasks` ?
   `checkAndSubmitSpeculatableTask` was checking for a specific identified task.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,45 +1164,48 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
-    if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
-      val medianDuration = successfulTaskDurations.median
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
-      // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
+    val timeMs = clock.getTimeMillis()
+    if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {
+      val threshold = if (numSuccessfulTasks <= 0) {
+        if (isSpeculationThresholdSpecified) {
+          max(speculationTaskDurationThresOpt.get, 
speculationSingleTaskDurationThreshold)
+        } else {
+          speculationSingleTaskDurationThreshold
         }
-        foundTasks |= speculated
+      } else {
+        val medianDuration = successfulTaskDurations.median
+        max(speculationMultiplier * medianDuration, minTimeToSpeculation)
       }
-    } else if (speculationTaskDurationThresOpt.isDefined && 
speculationTasksLessEqToSlots) {
-      val time = clock.getTimeMillis()
+      // bound based on that.
+      logDebug("Task length threshold for speculation: " + threshold)
+      foundTasks = checkAndSubmitSpeculatableTask(timeMs, threshold, 
numSuccessfulTasks)
+    } else if (isSpeculationThresholdSpecified && 
speculationTasksLessEqToSlots) {
       val threshold = speculationTaskDurationThresOpt.get
       logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
-      for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      foundTasks = checkAndSubmitSpeculatableTask(timeMs, threshold, 
numSuccessfulTasks,
+        customizedThreshold = true)
+    }
+    // avoid more warning logs.
+    if (foundTasks) {
+      val elapsedMs = clock.getTimeMillis() - timeMs
+      if (elapsedMs > minTimeToSpeculation) {
+        logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms > 
${minTimeToSpeculation}ms")
       }
     }
     foundTasks
   }
 
+  private def maybeRecompute(currentTimeMillis: Long): Unit = {

Review Comment:
   Remove this method and move it into `inefficientTask.maybeRecompute` ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,127 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+      val recomputeIntervalMs: Long,
+      val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(
+        appStatusStore: AppStatusStore,
+        nowMs: Long,
+        stageId: Int,
+        stageAttemptId: Int): Unit = {
+      if (!checkInefficientTask || appStatusStore == null) {
+        return
+      }
+      try {
+        this.synchronized {

Review Comment:
   You dont need `synchronized` here - `maybeRecompute` is getting called from 
within tsm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to