weixiuli commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r862440045


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,127 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask(
+      val recomputeIntervalMs: Long,
+      val maxTasksToCheck: Int) {
+    var taskData: Map[Long, TaskData] = null
+    var taskProgressThreshold = 0.0
+    private var lastComputeMs = -1L
+    var updateSealed = false
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    def maybeRecompute(
+        appStatusStore: AppStatusStore,
+        nowMs: Long,
+        stageId: Int,
+        stageAttemptId: Int): Unit = {
+      if (!checkInefficientTask || appStatusStore == null) {
+        return
+      }
+      try {
+        this.synchronized {
+          if (!updateSealed &&
+            (lastComputeMs <= 0 || nowMs > lastComputeMs + 
recomputeIntervalMs)) {
+            val (progressRate, numSuccessTasks): (Double, Int) = 
computeSuccessTaskProgress(
+              stageId, stageAttemptId, appStatusStore)
+            if (progressRate > 0.0) {
+              setTaskData(stageId, stageAttemptId, appStatusStore)
+              taskProgressThreshold = progressRate * 
speculationTaskProgressMultiplier
+              if (numSuccessTasks > maxTasksToCheck) {
+                updateSealed = true
+              }
+              lastComputeMs = nowMs
+            }
+          }
+        }
+      } catch {
+        case e: InterruptedException => throw e
+        case t: Throwable => logWarning("Failed to recompute InefficientTask 
state", t)
+      }
+    }
+
+    private def setTaskData(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): Unit = {
+      try {
+        // The stage entity will be writen into appStatusStore by
+        // 'listener.onStageSubmitted' and updated by 
'listener.onExecutorMetricsUpdate',
+        // it's not going to be in appStatusStore when 
'TaskSetManager.checkSpeculatableTasks'
+        // comes before 'listener.onStageSubmitted' to write it, so we should 
catch that and
+        // fallback.
+        val stageData = appStatusStore.stageAttempt(stageId, stageAttemptId, 
true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      } catch {
+        case e: RuntimeException => logWarning("Failed to set taskData", e)
+      }
+    }
+
+    private def computeSuccessTaskProgress(
+        stageId: Int,
+        stageAttemptId: Int,
+        appStatusStore: AppStatusStore): (Double, Int) = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      var progressRate = 0.0
+      var numSuccessTasks = 0
+      try {
+        appStatusStore.taskList(stageId, stageAttemptId, 
maxTasksToCheck).filter {

Review Comment:
   > directly iterate over the tasks via taskAttempts and get to the relevant 
metrics ?
   
   I tried various ways to get task metrics, and eventually found that task 
metrics were only stored in the InMemoryStore. So we may use the  existing API 
of AppStatusStore to get task metrics.  
    
   Even for large applications, we only need  care about the active stages and 
get their task metrics through the AppStatusStore, so calls are not expensive. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to