Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r891337273
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,45 +1164,48 @@ private[spark] class TaskSetManager(
// `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we
should only count the
// tasks that are submitted by this `TaskSetManager` and are completed
successfully.
val numSuccessfulTasks = successfulTaskDurations.size()
- if (numSuccessfulTasks >= minFinishedForSpeculation) {
- val time = clock.getTimeMillis()
- val medianDuration = successfulTaskDurations.median
- val threshold = max(speculationMultiplier * medianDuration,
minTimeToSpeculation)
- // TODO: Threshold should also look at standard deviation of task
durations and have a lower
- // bound based on that.
- logDebug("Task length threshold for speculation: " + threshold)
- for (tid <- runningTasksSet) {
- var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
- if (!speculated && executorDecommissionKillInterval.isDefined) {
- val taskInfo = taskInfos(tid)
- val decomState =
sched.getExecutorDecommissionState(taskInfo.executorId)
- if (decomState.isDefined) {
- // Check if this task might finish after this executor is
decommissioned.
- // We estimate the task's finish time by using the median task
duration.
- // Whereas the time when the executor might be decommissioned is
estimated using the
- // config executorDecommissionKillInterval. If the task is going
to finish after
- // decommissioning, then we will eagerly speculate the task.
- val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
- val executorDecomTime = decomState.get.startTime +
executorDecommissionKillInterval.get
- val canExceedDeadline = executorDecomTime <
taskEndTimeBasedOnMedianDuration
- if (canExceedDeadline) {
- speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
- }
- }
+ val timeMs = clock.getTimeMillis()
+ if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {
Review Comment:
cc @weixiuli
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]