Github user ankuriitg commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r215056633
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}
+ test("SPARK-24415: update metrics for tasks that finish late") {
+ val listener = new AppStatusListener(store, conf, true)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+ // Start job
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1,
stage2), null))
+
+ // Start 2 stages
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new
Properties()))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new
Properties()))
+
+ // Start 2 Tasks
+ val tasks = createTasks(2, Array("1"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage1.stageId,
stage1.attemptNumber, task))
+ }
+
+ // Task 1 Finished
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
Success, tasks(0), null))
+
+ // Stage 1 Completed
+ stage1.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+ // Stop job 1
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+ // Task 2 Killed
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
+ TaskKilled(reason = "Killed"), tasks(1), null))
+
+ // Ensure killed task metrics are updated
+ val allStages =
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+ val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
+ assert(failedStages.size == 1)
+ assert(failedStages.head.numKilledTasks == 1)
+ assert(failedStages.head.numCompleteTasks == 1)
+
+ val allJobs =
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+ assert(allJobs.size == 1)
--- End diff --
Sorry, I couldn't respond on Friday.
So, the above code has been replaced with "conditionalLiveUpdate(job, now,
removeStage)" in my PR. This means that if the taskEnd event is the last event
and stage has already completed, the job will be updated instantaneously. If it
is the last event but stage has not completed yet, then the onStageCompleted
event will update the job instantaneously.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]