weixiuli commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r882699854
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -769,6 +785,25 @@ private[spark] class TaskSetManager(
}
}
+ def setTaskRecordsAndRunTime(
+ info: TaskInfo,
+ result: DirectTaskResult[_]): Unit = {
+ var records = 0L
+ var runTime = 0L
+ result.accumUpdates.foreach { a =>
+ if (a.name == Some(shuffleRead.RECORDS_READ) ||
+ a.name == Some(input.RECORDS_READ)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ records += acc.value
+ } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+ val acc = a.asInstanceOf[LongAccumulator]
+ runTime = acc.value
+ }
+ }
+ info.setRecords(records)
+ info.setRunTime(runTime)
Review Comment:
We should ensure that the successRecords and successRunTime for the task are
set only when the task is finished, and using the `setTaskRecordsAndRunTime`
method to do that, which is called only once.
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,47 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_INEFFICIENT_ENABLE =
+ ConfigBuilder("spark.speculation.inefficient.enabled")
+ .doc("When set to true, spark will evaluate the efficiency of task
processing through the " +
+ "stage task metrics and only need to speculate the inefficient tasks.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+ ConfigBuilder("spark.speculation.inefficient.progress.multiplier")
+ .doc("A multiplier for evaluating the efficiency of task processing. A
task will be " +
+ "evaluated an inefficient one when it's progress rate is less than the
" +
+ "successTaskProgressRate * multiplier")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(1.0)
+
+ private[spark] val SPECULATION_TASK_DURATION_FACTOR =
+ ConfigBuilder("spark.speculation.inefficient.duration.factor")
+ .doc("When a task runtime is bigger than the factor * threshold, it
should be considered " +
+ "for speculation to avoid that it is too late to launch a necessary
speculation.")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(2.0)
+
+ private[spark] val SPECULATION_TASK_STATS_CACHE_DURATION =
+ ConfigBuilder("spark.speculation.inefficient.stats.cache.duration")
+ .doc("The interval of time between recompute success task progress to
avoid scanning " +
+ "repeatedly.")
+ .version("3.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1000ms")
+
+ private[spark] val SPECULATION_SINGLE_TASK_DURATION_THRESHOLD =
+ ConfigBuilder("spark.speculation.singleTask.duration.threshold")
+ .doc("Only one task in a TasksSet should speculative if it is taking
longer time than the " +
+ "threshold.")
Review Comment:
Ok, we may reuse the `spark.speculation.task.duration.threshold`, which I
will change for the next commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]