mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r882298957
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,47 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_INEFFICIENT_ENABLE =
+ ConfigBuilder("spark.speculation.inefficient.enabled")
+ .doc("When set to true, spark will evaluate the efficiency of task
processing through the " +
+ "stage task metrics and only need to speculate the inefficient tasks.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+ ConfigBuilder("spark.speculation.inefficient.progress.multiplier")
+ .doc("A multiplier for evaluating the efficiency of task processing. A
task will be " +
+ "evaluated an inefficient one when it's progress rate is less than the
" +
+ "successTaskProgressRate * multiplier")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(1.0)
+
+ private[spark] val SPECULATION_TASK_DURATION_FACTOR =
+ ConfigBuilder("spark.speculation.inefficient.duration.factor")
+ .doc("When a task runtime is bigger than the factor * threshold, it
should be considered " +
Review Comment:
In the documentation:
What is `threshold` here ?
Also, given there is `spark.speculation.multiplier` already - it is not
clear why we need this: how is it different, how should it be configured.
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,47 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_INEFFICIENT_ENABLE =
+ ConfigBuilder("spark.speculation.inefficient.enabled")
Review Comment:
What about renaming from `inefficient` to `efficiency` ? Or something better
?
Thoughts @Ngone51, @weixiuli ?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1289,61 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTaskCalculator {
+ var taskProgressThreshold = 0.0
+ var updateSealed = false
+ private var lastComputeMs = -1L
+
+ def maybeRecompute(nowMs: Long): Unit = {
+ if (!updateSealed && (lastComputeMs <= 0 ||
+ nowMs > lastComputeMs + speculationTaskStatsCacheInterval)) {
+ var successRecords = 0L
+ var successRunTime = 0L
+ var numSuccessTasks = 0L
+ taskInfos.values.filter(_.status == "SUCCESS").foreach { taskInfo =>
Review Comment:
`_.status == "SUCCESS"` -> `_.successful`
##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -83,6 +83,27 @@ class TaskInfo(
_accumulables = newAccumulables
}
+ private[spark] var successRecords = 0L
+ private[spark] var successRunTime = 0L
+
+ private[spark] def setRecords(records: Long): Unit = {
+ successRecords = records
+ }
+
+ private[spark] def setRunTime(runTime: Long): Unit = {
+ successRunTime = runTime
+ }
+
+ @volatile private[spark] var taskProgressRate = 0.0D
+
+ private[spark] def getTaskProgressRate(): Double = taskProgressRate
+
+ private[spark] def setRunTaskProgressRate(taskProgressRate: Double): Unit = {
Review Comment:
Make the new methods `private[scheduler]` instead of `private[spark]` ?
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,47 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_INEFFICIENT_ENABLE =
+ ConfigBuilder("spark.speculation.inefficient.enabled")
+ .doc("When set to true, spark will evaluate the efficiency of task
processing through the " +
+ "stage task metrics and only need to speculate the inefficient tasks.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+ ConfigBuilder("spark.speculation.inefficient.progress.multiplier")
+ .doc("A multiplier for evaluating the efficiency of task processing. A
task will be " +
+ "evaluated an inefficient one when it's progress rate is less than the
" +
+ "successTaskProgressRate * multiplier")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(1.0)
+
+ private[spark] val SPECULATION_TASK_DURATION_FACTOR =
+ ConfigBuilder("spark.speculation.inefficient.duration.factor")
+ .doc("When a task runtime is bigger than the factor * threshold, it
should be considered " +
+ "for speculation to avoid that it is too late to launch a necessary
speculation.")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(2.0)
+
+ private[spark] val SPECULATION_TASK_STATS_CACHE_DURATION =
+ ConfigBuilder("spark.speculation.inefficient.stats.cache.duration")
+ .doc("The interval of time between recompute success task progress to
avoid scanning " +
+ "repeatedly.")
+ .version("3.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1000ms")
+
+ private[spark] val SPECULATION_SINGLE_TASK_DURATION_THRESHOLD =
+ ConfigBuilder("spark.speculation.singleTask.duration.threshold")
+ .doc("Only one task in a TasksSet should speculative if it is taking
longer time than the " +
+ "threshold.")
Review Comment:
Why not use `spark.speculation.task.duration.threshold` ?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -83,6 +83,27 @@ class TaskInfo(
_accumulables = newAccumulables
}
+ private[spark] var successRecords = 0L
+ private[spark] var successRunTime = 0L
+
+ private[spark] def setRecords(records: Long): Unit = {
+ successRecords = records
+ }
+
+ private[spark] def setRunTime(runTime: Long): Unit = {
Review Comment:
Match the method name to the variable
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -769,6 +785,25 @@ private[spark] class TaskSetManager(
}
}
+ def setTaskRecordsAndRunTime(
+ info: TaskInfo,
+ result: DirectTaskResult[_]): Unit = {
+ var records = 0L
+ var runTime = 0L
+ result.accumUpdates.foreach { a =>
+ if (a.name == Some(shuffleRead.RECORDS_READ) ||
+ a.name == Some(input.RECORDS_READ)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ records += acc.value
+ } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ runTime = acc.value
+ }
+ }
+ info.setRecords(records)
+ info.setRunTime(runTime)
Review Comment:
This is indeed getting duplicated in multiple places, thoughts on minimizing
duplication @weixiuli ?
What I liked about `getTaskAccumulableInfosAndProgressRate` though was that
it did not introduce additional iteration cost beyond what already existed; but
perhaps that is fine ?
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,47 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_INEFFICIENT_ENABLE =
+ ConfigBuilder("spark.speculation.inefficient.enabled")
+ .doc("When set to true, spark will evaluate the efficiency of task
processing through the " +
+ "stage task metrics and only need to speculate the inefficient tasks.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+ ConfigBuilder("spark.speculation.inefficient.progress.multiplier")
+ .doc("A multiplier for evaluating the efficiency of task processing. A
task will be " +
+ "evaluated an inefficient one when it's progress rate is less than the
" +
+ "successTaskProgressRate * multiplier")
Review Comment:
In the user documentation, we have not defined `successTaskProgressRate`.
Also, please add some (brief) details on what the value for multiplier
should be set to.
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1289,61 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTaskCalculator {
+ var taskProgressThreshold = 0.0
+ var updateSealed = false
+ private var lastComputeMs = -1L
+
+ def maybeRecompute(nowMs: Long): Unit = {
+ if (!updateSealed && (lastComputeMs <= 0 ||
+ nowMs > lastComputeMs + speculationTaskStatsCacheInterval)) {
+ var successRecords = 0L
+ var successRunTime = 0L
+ var numSuccessTasks = 0L
+ taskInfos.values.filter(_.status == "SUCCESS").foreach { taskInfo =>
+ successRecords += taskInfo.successRecords
+ successRunTime += taskInfo.successRunTime
+ numSuccessTasks += 1
+ }
Review Comment:
Do we want to include all successful tasks here (which could include
multiple attempts for a partition) ?
Or do it for a successful (which ?) task execution ?
Alternative to `taskInfos.values`, for example, would be:
```
taskAttempts.foreach { p =>
val tasks = p.filter(_. successful)
// if tasks.size > 1, what do we do ? Pick first ? Pick smallest by
duration ?
// For example, either:
// a) tasks.headOption.foreach { taskInfo => ..., or
// b) val taskInfo = tasks.minBy(_.successRunTime), ...
}
```
+CC @Ngone51
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -109,6 +118,13 @@ private[spark] class TaskSetManager(
private val executorDecommissionKillInterval =
conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(TimeUnit.SECONDS.toMillis)
+ private[scheduler] val inefficientTaskCalculator =
Review Comment:
Make this `Option[InefficientTaskCalculator]` ?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -83,6 +83,27 @@ class TaskInfo(
_accumulables = newAccumulables
}
+ private[spark] var successRecords = 0L
+ private[spark] var successRunTime = 0L
Review Comment:
Please add a comment to indicate what these variables (and
`taskProgressRate` below) are about ? Including when they will be set ?
Also, rename it to something like `totalRecordsRead` and
`totalExecutorRunTime` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]