Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r855732467


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,123 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+    val recomputeIntervalMs: Long, val maxTasksToCheck: Int) {

Review Comment:
   ```suggestion
         val recomputeIntervalMs: Long,
         val maxTasksToCheck: Int) {
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -80,12 +82,22 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  val speculationTaskMinDuration = conf.get(SPECULATION_TASK_MIN_DURATION)
+  val speculationTaskProgressMultiplier = 
conf.get(SPECULATION_TASK_PROGRESS_MULTIPLIER)
+  val speculationTaskDurationFactor = 
conf.get(SPECULATION_TASK_DURATION_FACTOR)
+  val speculationSingleTaskDurationThreshold = 
conf.get(SPECULATION_SINGLE_TASK_DURATION_THRESHOLD)
+
+  val speculationTaskStatsCacheInterval = math.min(3600000, max(100,
+    conf.get(SPECULATION_TASK_STATS_CACHE_DURATION)))
+
   // Quantile of tasks at which to start speculation
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * 
numTasks).floor.toInt, 1)
   // User provided threshold for speculation regardless of whether the 
quantile has been reached
   val speculationTaskDurationThresOpt = 
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  val isThresholdSpecified = speculationTaskDurationThresOpt.isDefined &&

Review Comment:
   nit: `isSpeculationThresholdSpecified`



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,123 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+    val recomputeIntervalMs: Long, val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(appStatusStore: AppStatusStore, nowMs: Long,
+      stageId: Int, stageAttemptId: Int): Unit = {

Review Comment:
   nit:
   ```suggestion
       def maybeRecompute(
           appStatusStore: AppStatusStore,
           nowMs: Long,
           stageId: Int,
           stageAttemptId: Int): Unit = {
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,123 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+    val recomputeIntervalMs: Long, val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(appStatusStore: AppStatusStore, nowMs: Long,
+      stageId: Int, stageAttemptId: Int): Unit = {
+      if (!checkInefficientTask || appStatusStore == null) {
+        return
+      }
+      try {
+        this.synchronized {
+          if (!updateSealed &&
+            (lastComputeMs <= 0 || nowMs > lastComputeMs + 
recomputeIntervalMs)) {
+            val (progressRate, numSuccessTasks): (Double, Int) = 
computeSuccessTaskProgress(
+              stageId, stageAttemptId, appStatusStore)
+            if (progressRate > 0.0) {
+              setTaskData(stageId, stageAttemptId, appStatusStore)
+              taskProgressThreshold = progressRate * 
speculationTaskProgressMultiplier
+              if (numSuccessTasks > maxTasksToCheck) {
+                updateSealed = true
+              }
+              lastComputeMs = nowMs
+            }
+          }
+        }
+      } catch {
+        case e: InterruptedException => throw e
+        case t: Throwable => logWarning("Failed to recompute InefficientTask 
state", t)
+      }
+    }
+
+    private def setTaskData(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): Unit = {
+      try {
+        // The stage entity will be writen into appStatusStore by
+        // 'listener.onStageSubmitted' and updated by 
'listener.onExecutorMetricsUpdate',
+        // it's not going to be in appStatusStore when 
'TaskSetManager.checkSpeculatableTasks'
+        // comes before 'listener.onStageSubmitted' to write it, so we should 
catch that and
+        // fallback.
+        val stageData = appStatusStore.stageAttempt(stageId, stageAttemptId, 
true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      } catch {
+        case e: RuntimeException => logWarning("Failed to set taskData", e)
+      }
+    }
+
+    private def computeSuccessTaskProgress(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): (Double, Int) = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      var progressRate = 0.0
+      var numSuccessTasks = 0
+      try {
+        appStatusStore.taskList(stageId, stageAttemptId, 
maxTasksToCheck).filter {
+          _.status == "SUCCESS"
+        }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+          if (task.inputMetrics != null) {
+            sumInputRecords += task.inputMetrics.recordsRead
+          }
+          if (task.shuffleReadMetrics != null) {
+            sumShuffleReadRecords += task.shuffleReadMetrics.recordsRead
+          }

Review Comment:
   What's the difference between `inputMetrics` and `shuffleReadMetrics`, 
especially for ShuffleMapTask?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to