Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r214187951
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}
+ test("SPARK-24415: update metrics for tasks that finish late") {
+ val listener = new AppStatusListener(store, conf, true)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+ // Start job
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1,
stage2), null))
+
+ // Start 2 stages
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new
Properties()))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new
Properties()))
+
+ // Start 2 Tasks
+ val tasks = createTasks(2, Array("1"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage1.stageId,
stage1.attemptNumber, task))
+ }
+
+ // Task 1 Finished
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
Success, tasks(0), null))
+
+ // Stage 1 Completed
+ stage1.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+ // Stop job 1
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+ // Task 2 Killed
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
+ TaskKilled(reason = "Killed"), tasks(1), null))
+
+ // Ensure killed task metrics are updated
+ val allStages =
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+ val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
+ assert(failedStages.size == 1)
+ assert(failedStages.head.numKilledTasks == 1)
+ assert(failedStages.head.numCompleteTasks == 1)
+
+ val allJobs =
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+ assert(allJobs.size == 1)
--- End diff --
I was a little puzzled about why this test was working. Turns out that job
metrics are updated in `onTaskEnd` based on the jobs that the stage is tracking.
The weirdness if because above, in your test, you're ending the job before
the task end event, but the job is still being updated. That's because of the
above, and because `LIVE_ENTITY_UPDATE_PERIOD` is `0` (disabled) in the tests.
So in a real app you could still miss the updates to the job metrics, since
the `maybeUpdate` call in the `onTaskEnd` handler may skip updating the job,
and since it's not tracked anymore, it won't be flushed.
Anyway, this is probably minor and could be a separate fix. Could you file
a separate bug to audit remaining event order issues in this code (like the
above), and also what happens when events are dropped?
Unless you want to fix that here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]