Github user ankuriitg commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r212471192
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
- stage.status = v1.StageStatus.SKIPPED
- job.skippedStages += stage.info.stageId
- job.skippedTasks += stage.info.numTasks
- it.remove()
- update(stage, now)
+ // Only update the stage if it has not finished already
+ if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+ v1.StageStatus.PENDING.equals(stage.status)) {
+ stage.status = v1.StageStatus.SKIPPED
+ job.skippedStages += stage.info.stageId
+ job.skippedTasks += stage.info.numTasks
+ job.activeStages -= 1
+ it.remove()
--- End diff --
For already completed stages, we will leave the removal of stage to happen
in either onTaskEnd or onStageCompleted event. This ensures that stage metrics
are updated even when onJobEnd event is received before onTaskEnd event.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]