Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034655159
##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
checkInfoPopulated(listener, logUrlMap, processId)
}
+ test(s"Stage should be removed from liveStages to avoid deadExecutors
accumulated") {
+
+ val listener = new AppStatusListener(store, conf, true)
+
+ listener.onExecutorAdded(createExecutorAddedEvent(1))
+ listener.onExecutorAdded(createExecutorAddedEvent(2))
+ val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+ resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+ time += 1
+ stage.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new
Properties()))
+
+ val tasks = createTasks(2, Array("1", "2"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber, task))
+ }
+
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ Success, tasks(0), new ExecutorMetrics, null))
+
+ // executor lost, success task will be resubmitted
+ time += 1
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ Resubmitted, tasks(0), new ExecutorMetrics, null))
+
+ // executor lost, running task will be failed and rerun
+ time += 1
+ tasks(1).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new
ExecutorMetrics,
+ null))
+
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber, task))
+ }
+
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ Success, tasks(0), new ExecutorMetrics, null))
+
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ Success, tasks(0), new ExecutorMetrics, null))
+
+ listener.onStageCompleted(SparkListenerStageCompleted(stage))
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded ))
+
+ time += 1
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test"))
+ time += 1
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test"))
+
+ assert( listener.deadExecutors.size === 0 )
Review Comment:
```suggestion
assert(listener.deadExecutors.size === 0)
```
##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
checkInfoPopulated(listener, logUrlMap, processId)
}
+ test(s"Stage should be removed from liveStages to avoid deadExecutors
accumulated") {
Review Comment:
```suggestion
test(s"SPARK-41187: Stage should be removed from liveStages to avoid
deadExecutors accumulated") {
```
##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
}
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
- // TODO: can this really happen?
- if (event.taskInfo == null) {
+ // TODO: can taskInfo null really happen?
+ // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd
is useless and
+ // will make activeTask in stage to be negative, this will cause stage not
be removed in
+ // liveStages, and finally cause executor not removed in deadExecutors
Review Comment:
```suggestion
// SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason,
which is raised by executor lost,
// it can lead to negative `LiveStage.activeTasks` since there's no
corresponding `SparkListenerTaskStart`
// event for each of them. The negative activeTasks will make the stage
always remains in the live stage list
// as it can never meet the condition activeTasks == 0. This in turn
causes the dead executor to never be
// retained if that live stage's submissionTime is less than the dead
executor's removeTime( see
// isExecutorActiveForLiveStages). Since this kind of
`SparkListenerTaskEnd` is useless here, we simply
// ignore it.
```
##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
checkInfoPopulated(listener, logUrlMap, processId)
}
+ test(s"Stage should be removed from liveStages to avoid deadExecutors
accumulated") {
+
+ val listener = new AppStatusListener(store, conf, true)
+
+ listener.onExecutorAdded(createExecutorAddedEvent(1))
+ listener.onExecutorAdded(createExecutorAddedEvent(2))
+ val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+ resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+ time += 1
+ stage.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new
Properties()))
+
+ val tasks = createTasks(2, Array("1", "2"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber, task))
+ }
+
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptNumber, "taskType",
+ Success, tasks(0), new ExecutorMetrics, null))
+
+ // executor lost, success task will be resubmitted
Review Comment:
Could you copy-paste your analysis in JIRA to the PR description to
elaborate the issue more clearly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]