venkata91 commented on a change in pull request #28994:
URL: https://github.com/apache/spark/pull/28994#discussion_r456228668



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, 
appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, 
taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }

Review comment:
       how about recordsWritten? Should that also be considered wrt progress 
same wrt shuffleRecordsWritten?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, 
appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, 
taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }
+        if (task.shuffleReadMetrics != null) {
+          sumShuffleReadRecords += task.shuffleReadMetrics.recordsRead
+        }
+        sumExecutorRunTime += task.executorRunTime
+      }
+      if (sumExecutorRunTime > 0) {
+        (sumInputRecords + sumShuffleReadRecords) / (sumExecutorRunTime / 
1000.0)
+      } else 0
+    }
+
+    def maySpeculateTask(tid: Long, runtimeMs: Long, taskInfo: TaskInfo): 
Boolean = {
+      // note: 1) only check inefficient tasks when 
'SPECULATION_TASK_DURATION_THRESHOLD' > 0.
+      // 2) some tasks may have neither input records nor shuffleRead records, 
so
+      // the 'successTaskProgress' may be zero all the time, this case we 
should not consider,
+      // eg: some spark-sql like that 'msck repair table' or 'drop table' and 
so on.
+      if (!checkInefficientTask || successTaskProgress <= 0) {
+        true
+      } else if (runtimeMs < speculationTaskMinDuration) {
+        false
+      } else if (taskData != null && taskData.contains(tid) && taskData(tid) 
!= null &&
+        taskData(tid).taskMetrics.isDefined) {
+        val taskMetrics = taskData(tid).taskMetrics.get
+        val currentTaskProgressRate = (taskMetrics.inputMetrics.recordsRead +

Review comment:
       would it make sense to add taskProgress as part of taskMetrics that way 
it can also be shown in SparkUI? Although taskProgress for tasks which doesn't 
involve input/output/shuffle records would be hard to measure?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private class InefficientTask {
+    private var taskData: Map[Long, TaskData] = null
+    private var successTaskProgress = 0.0
+    private val checkInefficientTask = speculationTaskMinDuration > 0
+
+    if (checkInefficientTask) {
+      val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+      if (appStatusStore != null) {
+        successTaskProgress =
+          computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId, 
appStatusStore)
+        val stageData = appStatusStore.stageAttempt(taskSet.stageId, 
taskSet.stageAttemptId, true)
+        if (stageData != null) {
+          taskData = stageData._1.tasks.orNull
+        }
+      }
+    }
+
+    private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+      appStatusStore: AppStatusStore): Double = {
+      var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+      appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+        _.status == "SUCCESS"
+      }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+        if (task.inputMetrics != null) {
+          sumInputRecords += task.inputMetrics.recordsRead
+        }

Review comment:
       Even cache can also take time when written to disk, does that need to be 
taken into consideration? Similarly GC time, shuffle read blocked time etc. 
could also impact task progress




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to