Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/23038#discussion_r234728748
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}
+ test("SPARK-25451: total tasks in the executor summary should match
total stage tasks") {
+ val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+
+ val listener = new AppStatusListener(store, testConf, true)
+
+ val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new
Properties()))
+
+ val tasks = createTasks(4, Array("1", "2"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber, task))
+ }
+
+ time += 1
+ tasks(0).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptId, "taskType",
+ ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0),
null))
+ time += 1
+ tasks(1).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptId, "taskType",
+ ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1),
null))
+
+ stage.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage))
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new
RuntimeException("Bad Executor"))))
+
+ time += 1
+ tasks(2).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptId, "taskType",
+ ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2),
null))
+ time += 1
+ tasks(3).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId,
stage.attemptId, "taskType",
+ ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3),
null))
+
+ val esummary =
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+ esummary.foreach { execSummary =>
+ assert(execSummary.failedTasks == 2)
--- End diff --
Nit: also check `succeededTasks` and `killedTasks`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]