mridulm commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r862545494
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,127 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTask(
+ val recomputeIntervalMs: Long,
+ val maxTasksToCheck: Int) {
+ var taskData: Map[Long, TaskData] = null
+ var taskProgressThreshold = 0.0
+ private var lastComputeMs = -1L
+ var updateSealed = false
+ private val checkInefficientTask = speculationTaskMinDuration > 0
+
+ def maybeRecompute(
+ appStatusStore: AppStatusStore,
+ nowMs: Long,
+ stageId: Int,
+ stageAttemptId: Int): Unit = {
+ if (!checkInefficientTask || appStatusStore == null) {
+ return
+ }
+ try {
+ this.synchronized {
+ if (!updateSealed &&
+ (lastComputeMs <= 0 || nowMs > lastComputeMs +
recomputeIntervalMs)) {
+ val (progressRate, numSuccessTasks): (Double, Int) =
computeSuccessTaskProgress(
+ stageId, stageAttemptId, appStatusStore)
+ if (progressRate > 0.0) {
+ setTaskData(stageId, stageAttemptId, appStatusStore)
+ taskProgressThreshold = progressRate *
speculationTaskProgressMultiplier
+ if (numSuccessTasks > maxTasksToCheck) {
+ updateSealed = true
+ }
+ lastComputeMs = nowMs
+ }
+ }
+ }
+ } catch {
+ case e: InterruptedException => throw e
+ case t: Throwable => logWarning("Failed to recompute InefficientTask
state", t)
+ }
+ }
+
+ private def setTaskData(
+ stageId: Int,
+ stageAttemptId: Int,
+ appStatusStore: AppStatusStore): Unit = {
+ try {
+ // The stage entity will be writen into appStatusStore by
+ // 'listener.onStageSubmitted' and updated by
'listener.onExecutorMetricsUpdate',
+ // it's not going to be in appStatusStore when
'TaskSetManager.checkSpeculatableTasks'
+ // comes before 'listener.onStageSubmitted' to write it, so we should
catch that and
+ // fallback.
+ val stageData = appStatusStore.stageAttempt(stageId, stageAttemptId,
true)
+ if (stageData != null) {
+ taskData = stageData._1.tasks.orNull
+ }
+ } catch {
+ case e: RuntimeException => logWarning("Failed to set taskData", e)
+ }
+ }
+
+ private def computeSuccessTaskProgress(
+ stageId: Int,
+ stageAttemptId: Int,
+ appStatusStore: AppStatusStore): (Double, Int) = {
+ var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+ var progressRate = 0.0
+ var numSuccessTasks = 0
+ try {
+ appStatusStore.taskList(stageId, stageAttemptId,
maxTasksToCheck).filter {
Review Comment:
Take a look at how `taskList` is implemented - it gets very expensive as the
number of entries increase, since the inmemory store always copies before
sorting (there is no indexing like in other stores).
In this case, since we are iterating over all successful tasks for the
stage, simply iterate over `taskAttempts` (or even `taskInfos`), filter
appropriately (successful, etc) and pull out the details you need.
The specific metrics of interest are coming from `TaskInfo.accumulables` -
for example, take a look at `postTaskEnd` in DAGScheduler on how they are
created.
This will also mean you are not impacted by dropped events/event queue
lag/etc - in addition to the low cost.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]