Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r891337273


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,45 +1164,48 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we 
should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed 
successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
-    if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
-      val medianDuration = successfulTaskDurations.median
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
-      // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = 
sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is 
decommissioned.
-            // We estimate the task's finish time by using the median task 
duration.
-            // Whereas the time when the executor might be decommissioned is 
estimated using the
-            // config executorDecommissionKillInterval. If the task is going 
to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + 
medianDuration
-            val executorDecomTime = decomState.get.startTime + 
executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < 
taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
+    val timeMs = clock.getTimeMillis()
+    if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {

Review Comment:
   cc @weixiuli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to