weixiuli commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r861412215
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1217,6 +1276,123 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private[scheduler] class InefficientTask(
+ val recomputeIntervalMs: Long, val maxTasksToCheck: Int) {
+ var taskData: Map[Long, TaskData] = null
+ var taskProgressThreshold = 0.0
+ private var lastComputeMs = -1L
+ var updateSealed = false
+ private val checkInefficientTask = speculationTaskMinDuration > 0
+
+ def maybeRecompute(appStatusStore: AppStatusStore, nowMs: Long,
+ stageId: Int, stageAttemptId: Int): Unit = {
+ if (!checkInefficientTask || appStatusStore == null) {
+ return
+ }
+ try {
+ this.synchronized {
+ if (!updateSealed &&
+ (lastComputeMs <= 0 || nowMs > lastComputeMs +
recomputeIntervalMs)) {
+ val (progressRate, numSuccessTasks): (Double, Int) =
computeSuccessTaskProgress(
+ stageId, stageAttemptId, appStatusStore)
+ if (progressRate > 0.0) {
+ setTaskData(stageId, stageAttemptId, appStatusStore)
+ taskProgressThreshold = progressRate *
speculationTaskProgressMultiplier
+ if (numSuccessTasks > maxTasksToCheck) {
+ updateSealed = true
+ }
+ lastComputeMs = nowMs
+ }
+ }
+ }
+ } catch {
+ case e: InterruptedException => throw e
+ case t: Throwable => logWarning("Failed to recompute InefficientTask
state", t)
+ }
+ }
+
+ private def setTaskData(
+ stageId: Int,
+ stageAttemptId: Int,
+ appStatusStore: AppStatusStore): Unit = {
+ try {
+ // The stage entity will be writen into appStatusStore by
+ // 'listener.onStageSubmitted' and updated by
'listener.onExecutorMetricsUpdate',
+ // it's not going to be in appStatusStore when
'TaskSetManager.checkSpeculatableTasks'
+ // comes before 'listener.onStageSubmitted' to write it, so we should
catch that and
+ // fallback.
+ val stageData = appStatusStore.stageAttempt(stageId, stageAttemptId,
true)
+ if (stageData != null) {
+ taskData = stageData._1.tasks.orNull
+ }
+ } catch {
+ case e: RuntimeException => logWarning("Failed to set taskData", e)
+ }
+ }
+
+ private def computeSuccessTaskProgress(
+ stageId: Int,
+ stageAttemptId: Int,
+ appStatusStore: AppStatusStore): (Double, Int) = {
+ var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+ var progressRate = 0.0
+ var numSuccessTasks = 0
+ try {
+ appStatusStore.taskList(stageId, stageAttemptId,
maxTasksToCheck).filter {
+ _.status == "SUCCESS"
+ }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+ if (task.inputMetrics != null) {
+ sumInputRecords += task.inputMetrics.recordsRead
+ }
+ if (task.shuffleReadMetrics != null) {
+ sumShuffleReadRecords += task.shuffleReadMetrics.recordsRead
+ }
Review Comment:
As described offline : `inputMetrics` and `shuffleReadMetrics` can not
appear in the same stage at the same time, and using `inputMetrics` or
`shuffleReadMetrics` to evaluate the task efficiency in a stage, which is
reliable and can cover more scenarios, such as map stages or reduce stages, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]