Github user shahidki31 commented on a diff in the pull request:
https://github.com/apache/spark/pull/23038#discussion_r234399018
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}
+ test("SPARK-25451: total tasks in the executor summary should match
total stage tasks") {
+ val testConf = conf.clone()
+ .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s")
+
+ val listener = new AppStatusListener(store, testConf, true)
+
+ val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new
Properties()))
+
+ val tasks = createTasks(4, Array("1", "2"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
stage.attemptNumber, task))
+ }
+
+ tasks.filter(_.index < 2).foreach { task =>
+ time += 1
+ var execId = (task.index % 2 + 1).toString
+ tasks(task.index).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
+ ExecutorLostFailure(execId, true, Some("Lost executor")),
tasks(task.index), null))
+ }
+
+ stage.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage))
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new
RuntimeException("Bad Executor"))))
+
+ tasks.filter(_.index >= 2).foreach { task =>
+ time += 1
--- End diff --
Updated
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]