Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r212708646
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
- stage.status = v1.StageStatus.SKIPPED
- job.skippedStages += stage.info.stageId
- job.skippedTasks += stage.info.numTasks
- it.remove()
- update(stage, now)
+ // Only update the stage if it has not finished already
+ if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+ v1.StageStatus.PENDING.equals(stage.status)) {
+ stage.status = v1.StageStatus.SKIPPED
+ job.skippedStages += stage.info.stageId
+ job.skippedTasks += stage.info.numTasks
+ job.activeStages -= 1
+ it.remove()
--- End diff --
In your question, this == this PR?
If so, no, that's not what it's fixing. Task end events can "naturally"
arrive after the stage end event in the case of a stage failure, and this code
was missing that case.
When event drops occur, a lot of things get out of sync, and this change
wouldn't fix that. It perhaps could make it a little worse: if a task end event
does not arrive, then maybe with this change the stage will never be actually
removed from the live stages map. Not sure how easy it would be to recover from
that though, since dropped events could probably cause other sorts of leaks in
this class too, but I also feel that's a separate issue.
(Also, hopefully, dropped events for this listener should be less common in
2.3 after the listener bus changes.)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]