Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r865580173


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2051,6 +2051,39 @@ package object config {
       .doubleConf
       .createWithDefault(0.75)
 
+  private[spark] val SPECULATION_TASK_MIN_DURATION =
+    ConfigBuilder("spark.speculation.task.min.duration")
+      .doc("The minimum duration of a task can be speculative.")
+      .version("3.4.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("0s")
+
+  private[spark] val SPECULATION_TASK_PROGRESS_MULTIPLIER =
+    ConfigBuilder("spark.speculation.task.progress.multiplier")
+      .version("3.4.0")
+      .doubleConf
+      .createWithDefault(1.0)
+
+  private[spark] val SPECULATION_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.task.duration.factor")
+      .version("3.4.0")
+      .doubleConf
+      .createWithDefault(2.0)
+
+  private[spark] val SPECULATION_TASK_STATS_CACHE_DURATION =
+    ConfigBuilder("spark.speculation.task.stats.cache.duration")
+      .version("3.4.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1000ms")

Review Comment:
   Could you add doc for these configs?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1269,120 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, the inefficient 
tasks come from
+   * the tasks which may be speculated by the previous strategy.
+   */
+  private[scheduler] class InefficientTask {

Review Comment:
   How about `InefficientTaskCalculator`?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -80,12 +82,21 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  private val speculationTaskMinDuration = 
conf.get(SPECULATION_TASK_MIN_DURATION)
+  private val speculationTaskProgressMultiplier = 
conf.get(SPECULATION_TASK_PROGRESS_MULTIPLIER)
+  private val speculationTaskDurationFactor = 
conf.get(SPECULATION_TASK_DURATION_FACTOR)
+  private val speculationSingleTaskDurationThreshold =
+    conf.get(SPECULATION_SINGLE_TASK_DURATION_THRESHOLD)
+  private val speculationTaskStatsCacheInterval = 
conf.get(SPECULATION_TASK_STATS_CACHE_DURATION)
+
   // Quantile of tasks at which to start speculation
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * 
numTasks).floor.toInt, 1)
   // User provided threshold for speculation regardless of whether the 
quantile has been reached
   val speculationTaskDurationThresOpt = 
conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  private val isSpeculationThresholdSpecified = 
speculationTaskDurationThresOpt.isDefined &&
+    speculationTaskDurationThresOpt.exists(_ > 0)

Review Comment:
   ```suggestion
     private val isSpeculationThresholdSpecified = 
speculationTaskDurationThresOpt.exists(_ > 0)
   ```



##########
core/src/main/scala/org/apache/spark/SparkStatusTracker.scala:
##########
@@ -120,4 +120,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext, 
store: AppStatusStore
         exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
     }.toArray
   }
+
+  def getAppStatusStore: AppStatusStore = {
+    store
+  }

Review Comment:
   This sounds good to me. Just adds one note:
   
   > For completed tasks, this is already available - but will require a scan 
through accumulables (and so a performance gain by materializing them).
   > For in progress tasks, this is currently not maintained in scheduler - and 
so tracking them during heatbeat would allow scheduler to leverage the state.
   
   I think this essentially means we'll have intermediate `accumulables` for 
`TaskInfo` rather than only final `accumulables` for the completed tasks as 
what we have today. And we'll have to track all tasks since the completed tasks 
were inprogress tasks ever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to