Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r213143804
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,22 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
- stage.status = v1.StageStatus.SKIPPED
- job.skippedStages += stage.info.stageId
- job.skippedTasks += stage.info.numTasks
- it.remove()
- update(stage, now)
+ // Only update the stage if it has not finished already
+ if (v1.StageStatus.ACTIVE.equals(stage.status) ||
--- End diff --
So I went back and took a closer look and I think this isn't entirely
correct (and wasn't entirely correct before either).
If I remember the semantics correctly, the stage should be skipped if it is
part of the job's stages, and is in the pending state when the job finishes.
If it's in the active state, it should not be marked as skipped. If you do
that, the update to the skipped tasks (in L358) will most certainly be wrong.
So if the state is still active here, it means some event was missed. The
best we can do in that case, I think, is remove it from the live stages list
and update the pool data, and that's it.
On a related note, if the "onStageSubmitted" event is missed, the stage
will remain in the "pending" state even if tasks start on it. Perhaps that
could also be added to the "onTaskStart" handler, just to be sure the stage is
marked as active.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]