Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r215343938
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}
+ test("SPARK-24415: update metrics for tasks that finish late") {
+ val listener = new AppStatusListener(store, conf, true)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+ // Start job
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1,
stage2), null))
+
+ // Start 2 stages
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new
Properties()))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new
Properties()))
+
+ // Start 2 Tasks
+ val tasks = createTasks(2, Array("1"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage1.stageId,
stage1.attemptNumber, task))
+ }
+
+ // Task 1 Finished
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
Success, tasks(0), null))
+
+ // Stage 1 Completed
+ stage1.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+ // Stop job 1
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+ // Task 2 Killed
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
+ TaskKilled(reason = "Killed"), tasks(1), null))
+
+ // Ensure killed task metrics are updated
+ val allStages =
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+ val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
+ assert(failedStages.size == 1)
+ assert(failedStages.head.numKilledTasks == 1)
+ assert(failedStages.head.numCompleteTasks == 1)
+
+ val allJobs =
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+ assert(allJobs.size == 1)
--- End diff --
Ah, missed that. Took another look and it should be fine.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]