Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r865580173
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,39 @@ package object config {
.doubleConf
.createWithDefault(0.75)
+ private[spark] val SPECULATION_TASK_MIN_DURATION =
+ ConfigBuilder("spark.speculation.task.min.duration")
+ .doc("The minimum duration of a task can be speculative.")
+ .version("3.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("0s")
+
+ private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+ ConfigBuilder("spark.speculation.task.progress.multiplier")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(1.0)
+
+ private[spark] val SPECULATION_TASK_DURATION_FACTOR =
+ ConfigBuilder("spark.speculation.task.duration.factor")
+ .version("3.4.0")
+ .doubleConf
+ .createWithDefault(2.0)
+
+ private[spark] val SPECULATION_TASK_STATS_CACHE_DURATION =
+ ConfigBuilder("spark.speculation.task.stats.cache.duration")
+ .version("3.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1000ms")
Review Comment:
Could you add doc for these configs?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1269,120 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTask {
Review Comment:
How about `InefficientTaskCalculator`?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -80,12 +82,21 @@ private[spark] class TaskSetManager(
val copiesRunning = new Array[Int](numTasks)
val speculationEnabled = conf.get(SPECULATION_ENABLED)
+ private val speculationTaskMinDuration =
conf.get(SPECULATION_TASK_MIN_DURATION)
+ private val speculationTaskProgressMultiplier =
conf.get(SPECULATION_TASK_PROGRESS_MULTIPLIER)
+ private val speculationTaskDurationFactor =
conf.get(SPECULATION_TASK_DURATION_FACTOR)
+ private val speculationSingleTaskDurationThreshold =
+ conf.get(SPECULATION_SINGLE_TASK_DURATION_THRESHOLD)
+ private val speculationTaskStatsCacheInterval =
conf.get(SPECULATION_TASK_STATS_CACHE_DURATION)
+
// Quantile of tasks at which to start speculation
val speculationQuantile = conf.get(SPECULATION_QUANTILE)
val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
val minFinishedForSpeculation = math.max((speculationQuantile *
numTasks).floor.toInt, 1)
// User provided threshold for speculation regardless of whether the
quantile has been reached
val speculationTaskDurationThresOpt =
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+ private val isSpeculationThresholdSpecified =
speculationTaskDurationThresOpt.isDefined &&
+ speculationTaskDurationThresOpt.exists(_ > 0)
Review Comment:
```suggestion
private val isSpeculationThresholdSpecified =
speculationTaskDurationThresOpt.exists(_ > 0)
```
##########
core/src/main/scala/org/apache/spark/SparkStatusTracker.scala:
##########
@@ -120,4 +120,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext,
store: AppStatusStore
exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
}.toArray
}
+
+ def getAppStatusStore: AppStatusStore = {
+ store
+ }
Review Comment:
This sounds good to me. Just adds one note:
> For completed tasks, this is already available - but will require a scan
through accumulables (and so a performance gain by materializing them).
> For in progress tasks, this is currently not maintained in scheduler - and
so tracking them during heatbeat would allow scheduler to leverage the state.
I think this essentially means we'll have intermediate `accumulables` for
`TaskInfo` rather than only final `accumulables` for the completed tasks as
what we have today. And we'll have to track all tasks since the completed tasks
were inprogress tasks ever.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]