mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r892802648
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
* Check if the task associated with the given tid has past the time
threshold and should be
* speculative run.
*/
- private def checkAndSubmitSpeculatableTask(
- tid: Long,
+ private def checkAndSubmitSpeculatableTasks(
currentTimeMillis: Long,
- threshold: Double): Boolean = {
- val info = taskInfos(tid)
- val index = info.index
- if (!successful(index) && copiesRunning(index) == 1 &&
- info.timeRunning(currentTimeMillis) > threshold &&
!speculatableTasks.contains(index)) {
- addPendingTask(index, speculatable = true)
- logInfo(
- ("Marking task %d in stage %s (on %s) as speculatable because it ran
more" +
- " than %.0f ms(%d speculatable tasks in this taskset now)")
- .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
- speculatableTasks += index
- sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
- true
- } else {
- false
+ threshold: Double,
+ numSuccessfulTasks: Int,
+ customizedThreshold: Boolean = false): Boolean = {
+ var foundTasksResult = false
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
+ val index = info.index
+ if (!successful(index) && copiesRunning(index) == 1 &&
!speculatableTasks.contains(index)) {
+ val runtimeMs = info.timeRunning(currentTimeMillis)
+
+ def checkMaySpeculate(): Boolean = {
+ if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+ true
+ } else {
+ val longTimeTask = (numTasks <= 1) ||
+ runtimeMs > efficientTaskDurationFactor * threshold
+ longTimeTask ||
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+ }
+ }
+
+ val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+ val executorDecommissionSpeculate =
+ (!maySpeculate &&
+ executorDecommissionKillInterval.isDefined &&
!successfulTaskDurations.isEmpty()) && {
+ val taskInfo = taskInfos(tid)
Review Comment:
nit: Use `info` here. (see below btw)
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
* Check if the task associated with the given tid has past the time
threshold and should be
* speculative run.
*/
- private def checkAndSubmitSpeculatableTask(
- tid: Long,
+ private def checkAndSubmitSpeculatableTasks(
currentTimeMillis: Long,
- threshold: Double): Boolean = {
- val info = taskInfos(tid)
- val index = info.index
- if (!successful(index) && copiesRunning(index) == 1 &&
- info.timeRunning(currentTimeMillis) > threshold &&
!speculatableTasks.contains(index)) {
- addPendingTask(index, speculatable = true)
- logInfo(
- ("Marking task %d in stage %s (on %s) as speculatable because it ran
more" +
- " than %.0f ms(%d speculatable tasks in this taskset now)")
- .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
- speculatableTasks += index
- sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
- true
- } else {
- false
+ threshold: Double,
+ numSuccessfulTasks: Int,
+ customizedThreshold: Boolean = false): Boolean = {
+ var foundTasksResult = false
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
+ val index = info.index
+ if (!successful(index) && copiesRunning(index) == 1 &&
!speculatableTasks.contains(index)) {
+ val runtimeMs = info.timeRunning(currentTimeMillis)
+
+ def checkMaySpeculate(): Boolean = {
+ if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+ true
+ } else {
+ val longTimeTask = (numTasks <= 1) ||
+ runtimeMs > efficientTaskDurationFactor * threshold
+ longTimeTask ||
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+ }
+ }
+
+ val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+ val executorDecommissionSpeculate =
+ (!maySpeculate &&
+ executorDecommissionKillInterval.isDefined &&
!successfulTaskDurations.isEmpty()) && {
+ val taskInfo = taskInfos(tid)
+ val decomState =
sched.getExecutorDecommissionState(taskInfo.executorId)
+ decomState.isDefined && {
+ // Check if this task might finish after this executor is
decommissioned.
+ // We estimate the task's finish time by using the median task
duration.
+ // Whereas the time when the executor might be decommissioned is
estimated using the
+ // config executorDecommissionKillInterval. If the task is going
to finish after
+ // decommissioning, then we will eagerly speculate the task.
+ val taskEndTimeBasedOnMedianDuration =
+ taskInfos(tid).launchTime + successfulTaskDurations.median
+ val executorDecomTime =
+ decomState.get.startTime + executorDecommissionKillInterval.get
+ executorDecomTime < taskEndTimeBasedOnMedianDuration
+ }
Review Comment:
nit: A simplification here would be:
```suggestion
sched.getExecutorDecommissionState(taskInfo.executorId).exists {
decomState =>
// Check if this task might finish after this executor is
decommissioned.
// We estimate the task's finish time by using the median task
duration.
// Whereas the time when the executor might be decommissioned
is estimated using the
// config executorDecommissionKillInterval. If the task is
going to finish after
// decommissioning, then we will eagerly speculate the task.
val taskEndTimeBasedOnMedianDuration =
info.launchTime + successfulTaskDurations.median
val executorDecomTime =
decomState.startTime + executorDecommissionKillInterval.get
executorDecomTime < taskEndTimeBasedOnMedianDuration
}
```
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -800,6 +814,10 @@ private[spark] class TaskSetManager(
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
+ inefficientTaskCalculator.foreach { inefficientTask =>
+ inefficientTask.updateTaskProgressThreshold(result)
+ inefficientTask.removeRuningTasksProgressRate(tid)
Review Comment:
nit: Rename `inefficientTask` -> `calculator` here ?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1260,71 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTaskCalculator {
+ private var allTotalRecordsRead = 0L
+ private var allTotalExecutorRunTime = 0L
+ @volatile private var successTaskProgressThreshold = 0.0D
+ private val runingTasksProgressRate = new ConcurrentHashMap[Long, Double]()
+
+ private[scheduler] def updateTaskProgressThreshold(result:
DirectTaskResult[_]): Unit = {
+ var totalRecordsRead = 0L
+ var totalExecutorRunTime = 0L
+ result.accumUpdates.foreach { a =>
+ if (a.name == Some(shuffleRead.RECORDS_READ) ||
+ a.name == Some(input.RECORDS_READ)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ totalRecordsRead += acc.value
+ } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ totalExecutorRunTime = acc.value
+ }
+ }
+ allTotalRecordsRead += totalRecordsRead
+ allTotalExecutorRunTime += totalExecutorRunTime
+ if (allTotalRecordsRead > 0 && allTotalExecutorRunTime > 0) {
+ successTaskProgressThreshold = allTotalRecordsRead /
(allTotalExecutorRunTime / 1000.0)
+ }
+ }
+
+ private[scheduler] def updateRuningTasksProgressRate(
Review Comment:
Other than this method, all other methods in this class can be
`private[TaskSetManager]` to limit visibility.
@Ngone51's comment about `synchronized` access to the state will become
clear in that case (since all use from `TaskSetManager` would be within
scheduler protected lock)
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
* Check if the task associated with the given tid has past the time
threshold and should be
* speculative run.
*/
- private def checkAndSubmitSpeculatableTask(
- tid: Long,
+ private def checkAndSubmitSpeculatableTasks(
currentTimeMillis: Long,
- threshold: Double): Boolean = {
- val info = taskInfos(tid)
- val index = info.index
- if (!successful(index) && copiesRunning(index) == 1 &&
- info.timeRunning(currentTimeMillis) > threshold &&
!speculatableTasks.contains(index)) {
- addPendingTask(index, speculatable = true)
- logInfo(
- ("Marking task %d in stage %s (on %s) as speculatable because it ran
more" +
- " than %.0f ms(%d speculatable tasks in this taskset now)")
- .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
- speculatableTasks += index
- sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
- true
- } else {
- false
+ threshold: Double,
+ numSuccessfulTasks: Int,
+ customizedThreshold: Boolean = false): Boolean = {
+ var foundTasksResult = false
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
+ val index = info.index
+ if (!successful(index) && copiesRunning(index) == 1 &&
!speculatableTasks.contains(index)) {
+ val runtimeMs = info.timeRunning(currentTimeMillis)
+
+ def checkMaySpeculate(): Boolean = {
+ if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
+ true
+ } else {
+ val longTimeTask = (numTasks <= 1) ||
+ runtimeMs > efficientTaskDurationFactor * threshold
+ longTimeTask ||
inefficientTaskCalculator.get.maySpeculateTask(tid, runtimeMs, info)
+ }
+ }
+
+ val maySpeculate = (runtimeMs > threshold) && checkMaySpeculate()
+ val executorDecommissionSpeculate =
+ (!maySpeculate &&
+ executorDecommissionKillInterval.isDefined &&
!successfulTaskDurations.isEmpty()) && {
+ val taskInfo = taskInfos(tid)
+ val decomState =
sched.getExecutorDecommissionState(taskInfo.executorId)
+ decomState.isDefined && {
+ // Check if this task might finish after this executor is
decommissioned.
+ // We estimate the task's finish time by using the median task
duration.
+ // Whereas the time when the executor might be decommissioned is
estimated using the
+ // config executorDecommissionKillInterval. If the task is going
to finish after
+ // decommissioning, then we will eagerly speculate the task.
+ val taskEndTimeBasedOnMedianDuration =
+ taskInfos(tid).launchTime + successfulTaskDurations.median
+ val executorDecomTime =
+ decomState.get.startTime + executorDecommissionKillInterval.get
+ executorDecomTime < taskEndTimeBasedOnMedianDuration
+ }
+ }
+ val speculated = maySpeculate || executorDecommissionSpeculate
Review Comment:
Pull the executor decomissioning related logic into a private method ?
So we have
```
val speculated = (runtimeMs > threshold && checkMaySpeculate()) ||
shouldSpeculateForExecutorDecomissioning(info)
```
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1108,40 +1162,29 @@ private[spark] class TaskSetManager(
// `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we
should only count the
// tasks that are submitted by this `TaskSetManager` and are completed
successfully.
val numSuccessfulTasks = successfulTaskDurations.size()
- if (numSuccessfulTasks >= minFinishedForSpeculation) {
- val time = clock.getTimeMillis()
- val medianDuration = successfulTaskDurations.median
- val threshold = max(speculationMultiplier * medianDuration,
minTimeToSpeculation)
- // TODO: Threshold should also look at standard deviation of task
durations and have a lower
- // bound based on that.
- logDebug("Task length threshold for speculation: " + threshold)
- for (tid <- runningTasksSet) {
- var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
- if (!speculated && executorDecommissionKillInterval.isDefined) {
- val taskInfo = taskInfos(tid)
- val decomState =
sched.getExecutorDecommissionState(taskInfo.executorId)
- if (decomState.isDefined) {
- // Check if this task might finish after this executor is
decommissioned.
- // We estimate the task's finish time by using the median task
duration.
- // Whereas the time when the executor might be decommissioned is
estimated using the
- // config executorDecommissionKillInterval. If the task is going
to finish after
- // decommissioning, then we will eagerly speculate the task.
- val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime +
medianDuration
- val executorDecomTime = decomState.get.startTime +
executorDecommissionKillInterval.get
- val canExceedDeadline = executorDecomTime <
taskEndTimeBasedOnMedianDuration
- if (canExceedDeadline) {
- speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
- }
- }
- }
- foundTasks |= speculated
+ val timeMs = clock.getTimeMillis()
+ if (numSuccessfulTasks >= minFinishedForSpeculation || numTasks == 1) {
+ val threshold = if (numSuccessfulTasks <= 0 &&
isSpeculationThresholdSpecified) {
+ speculationTaskDurationThresOpt.get
+ } else {
+ val medianDuration = successfulTaskDurations.median
+ speculationMultiplier * medianDuration
}
- } else if (speculationTaskDurationThresOpt.isDefined &&
speculationTasksLessEqToSlots) {
- val time = clock.getTimeMillis()
- val threshold = speculationTaskDurationThresOpt.get
+ val newThreshold = max(threshold, minTimeToSpeculation)
+ // bound based on that.
+ logDebug("Task length threshold for speculation: " + newThreshold)
+ foundTasks = checkAndSubmitSpeculatableTasks(timeMs, newThreshold,
numSuccessfulTasks)
+ } else if (isSpeculationThresholdSpecified &&
speculationTasksLessEqToSlots) {
+ val threshold = max(speculationTaskDurationThresOpt.get,
minTimeToSpeculation)
logDebug(s"Tasks taking longer time than provided speculation threshold:
$threshold")
- for (tid <- runningTasksSet) {
- foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+ foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold,
numSuccessfulTasks,
+ customizedThreshold = true)
+ }
+ // avoid more warning logs.
+ if (foundTasks) {
+ val elapsedMs = clock.getTimeMillis() - timeMs
+ if (elapsedMs > minTimeToSpeculation) {
+ logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms >
${minTimeToSpeculation}ms")
}
Review Comment:
One of the things which is slightly difficult to reason about in the PR is
the change in semantics around `numTasks == 1`. Can we preserve the existing
behavior w.r.t `numTasks == 1` ? We can expand the functionality related to it
in a follow up PR: where we focus only on that change in behavior.
That is, we remove all the special casing for `numTasks == 1` introduced in
this change.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1260,71 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTaskCalculator {
+ private var allTotalRecordsRead = 0L
+ private var allTotalExecutorRunTime = 0L
+ @volatile private var successTaskProgressThreshold = 0.0D
+ private val runingTasksProgressRate = new ConcurrentHashMap[Long, Double]()
+
+ private[scheduler] def updateTaskProgressThreshold(result:
DirectTaskResult[_]): Unit = {
+ var totalRecordsRead = 0L
+ var totalExecutorRunTime = 0L
+ result.accumUpdates.foreach { a =>
+ if (a.name == Some(shuffleRead.RECORDS_READ) ||
+ a.name == Some(input.RECORDS_READ)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ totalRecordsRead += acc.value
+ } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ totalExecutorRunTime = acc.value
+ }
+ }
+ allTotalRecordsRead += totalRecordsRead
+ allTotalExecutorRunTime += totalExecutorRunTime
+ if (allTotalRecordsRead > 0 && allTotalExecutorRunTime > 0) {
+ successTaskProgressThreshold = allTotalRecordsRead /
(allTotalExecutorRunTime / 1000.0)
+ }
+ }
+
+ private[scheduler] def updateRuningTasksProgressRate(
+ taskId: Long,
+ taskProgressRate: Double): Unit = {
+ runingTasksProgressRate.put(taskId, taskProgressRate)
+ }
+
+ private[scheduler] def removeRuningTasksProgressRate(taskId: Long): Unit =
{
+ runingTasksProgressRate.remove(taskId)
+ }
+
+ private[scheduler] def maySpeculateTask(
+ tid: Long,
+ runtimeMs: Long,
+ taskInfo: TaskInfo): Boolean = {
+ // Only check inefficient tasks when successTaskProgressThreshold > 0,
because some stage
+ // tasks may have neither input records nor shuffleRead records, so the
+ // successTaskProgressThreshold may be zero all the time, this case we
should make sure
+ // it can be speculated. eg: some spark-sql like that 'msck repair
table' or 'drop table'
+ // and so on.
+ lazy val currentTaskProgressRate =
runingTasksProgressRate.getOrDefault(tid, 0.0)
+ if (successTaskProgressThreshold <= 0.0 || currentTaskProgressRate <=
0.0) {
Review Comment:
nit: Reorganize code so that we remove use of `lazy val` ?
Except for the case of `successTaskProgressThreshold <= 0`, we always use
this immediately.
```suggestion
if (successTaskProgressThreshold <= 0.0) return true
val currentTaskProgressRate =
runingTasksProgressRate.getOrDefault(tid, 0.0)
if (currentTaskProgressRate <= 0.0) {
```
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1068,25 +1086,61 @@ private[spark] class TaskSetManager(
* Check if the task associated with the given tid has past the time
threshold and should be
* speculative run.
*/
- private def checkAndSubmitSpeculatableTask(
- tid: Long,
+ private def checkAndSubmitSpeculatableTasks(
currentTimeMillis: Long,
- threshold: Double): Boolean = {
- val info = taskInfos(tid)
- val index = info.index
- if (!successful(index) && copiesRunning(index) == 1 &&
- info.timeRunning(currentTimeMillis) > threshold &&
!speculatableTasks.contains(index)) {
- addPendingTask(index, speculatable = true)
- logInfo(
- ("Marking task %d in stage %s (on %s) as speculatable because it ran
more" +
- " than %.0f ms(%d speculatable tasks in this taskset now)")
- .format(index, taskSet.id, info.host, threshold,
speculatableTasks.size + 1))
- speculatableTasks += index
- sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
- true
- } else {
- false
+ threshold: Double,
+ numSuccessfulTasks: Int,
+ customizedThreshold: Boolean = false): Boolean = {
+ var foundTasksResult = false
+ for (tid <- runningTasksSet) {
+ val info = taskInfos(tid)
+ val index = info.index
+ if (!successful(index) && copiesRunning(index) == 1 &&
!speculatableTasks.contains(index)) {
+ val runtimeMs = info.timeRunning(currentTimeMillis)
+
+ def checkMaySpeculate(): Boolean = {
+ if (customizedThreshold || !inefficientTaskCalculator.isDefined) {
Review Comment:
nit: `!inefficientTaskCalculator.isDefined` ->
`inefficientTaskCalculator.isEmpty`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]