venkata91 commented on a change in pull request #28994:
URL: https://github.com/apache/spark/pull/28994#discussion_r456228668
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private class InefficientTask {
+ private var taskData: Map[Long, TaskData] = null
+ private var successTaskProgress = 0.0
+ private val checkInefficientTask = speculationTaskMinDuration > 0
+
+ if (checkInefficientTask) {
+ val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+ if (appStatusStore != null) {
+ successTaskProgress =
+ computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId,
appStatusStore)
+ val stageData = appStatusStore.stageAttempt(taskSet.stageId,
taskSet.stageAttemptId, true)
+ if (stageData != null) {
+ taskData = stageData._1.tasks.orNull
+ }
+ }
+ }
+
+ private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+ appStatusStore: AppStatusStore): Double = {
+ var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+ appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+ _.status == "SUCCESS"
+ }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+ if (task.inputMetrics != null) {
+ sumInputRecords += task.inputMetrics.recordsRead
+ }
Review comment:
how about recordsWritten? Should that also be considered wrt progress
same wrt shuffleRecordsWritten?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private class InefficientTask {
+ private var taskData: Map[Long, TaskData] = null
+ private var successTaskProgress = 0.0
+ private val checkInefficientTask = speculationTaskMinDuration > 0
+
+ if (checkInefficientTask) {
+ val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+ if (appStatusStore != null) {
+ successTaskProgress =
+ computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId,
appStatusStore)
+ val stageData = appStatusStore.stageAttempt(taskSet.stageId,
taskSet.stageAttemptId, true)
+ if (stageData != null) {
+ taskData = stageData._1.tasks.orNull
+ }
+ }
+ }
+
+ private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+ appStatusStore: AppStatusStore): Double = {
+ var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+ appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+ _.status == "SUCCESS"
+ }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+ if (task.inputMetrics != null) {
+ sumInputRecords += task.inputMetrics.recordsRead
+ }
+ if (task.shuffleReadMetrics != null) {
+ sumShuffleReadRecords += task.shuffleReadMetrics.recordsRead
+ }
+ sumExecutorRunTime += task.executorRunTime
+ }
+ if (sumExecutorRunTime > 0) {
+ (sumInputRecords + sumShuffleReadRecords) / (sumExecutorRunTime /
1000.0)
+ } else 0
+ }
+
+ def maySpeculateTask(tid: Long, runtimeMs: Long, taskInfo: TaskInfo):
Boolean = {
+ // note: 1) only check inefficient tasks when
'SPECULATION_TASK_DURATION_THRESHOLD' > 0.
+ // 2) some tasks may have neither input records nor shuffleRead records,
so
+ // the 'successTaskProgress' may be zero all the time, this case we
should not consider,
+ // eg: some spark-sql like that 'msck repair table' or 'drop table' and
so on.
+ if (!checkInefficientTask || successTaskProgress <= 0) {
+ true
+ } else if (runtimeMs < speculationTaskMinDuration) {
+ false
+ } else if (taskData != null && taskData.contains(tid) && taskData(tid)
!= null &&
+ taskData(tid).taskMetrics.isDefined) {
+ val taskMetrics = taskData(tid).taskMetrics.get
+ val currentTaskProgressRate = (taskMetrics.inputMetrics.recordsRead +
Review comment:
would it make sense to add taskProgress as part of taskMetrics that way
it can also be shown in SparkUI? Although taskProgress for tasks which doesn't
involve input/output/shuffle records would be hard to measure?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -1125,6 +1142,78 @@ private[spark] class TaskSetManager(
def executorAdded(): Unit = {
recomputeLocality()
}
+
+ /**
+ * A class for checking inefficient tasks to be speculated, the inefficient
tasks come from
+ * the tasks which may be speculated by the previous strategy.
+ */
+ private class InefficientTask {
+ private var taskData: Map[Long, TaskData] = null
+ private var successTaskProgress = 0.0
+ private val checkInefficientTask = speculationTaskMinDuration > 0
+
+ if (checkInefficientTask) {
+ val appStatusStore = sched.sc.statusTracker.getAppStatusStore
+ if (appStatusStore != null) {
+ successTaskProgress =
+ computeSuccessTaskProgress(taskSet.stageId, taskSet.stageAttemptId,
appStatusStore)
+ val stageData = appStatusStore.stageAttempt(taskSet.stageId,
taskSet.stageAttemptId, true)
+ if (stageData != null) {
+ taskData = stageData._1.tasks.orNull
+ }
+ }
+ }
+
+ private def computeSuccessTaskProgress(stageId: Int, stageAttemptId: Int,
+ appStatusStore: AppStatusStore): Double = {
+ var sumInputRecords, sumShuffleReadRecords, sumExecutorRunTime = 0.0
+ appStatusStore.taskList(stageId, stageAttemptId, Int.MaxValue).filter {
+ _.status == "SUCCESS"
+ }.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
+ if (task.inputMetrics != null) {
+ sumInputRecords += task.inputMetrics.recordsRead
+ }
Review comment:
Even cache can also take time when written to disk, does that need to be
taken into consideration? Similarly GC time, shuffle read blocked time etc.
could also impact task progress
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]