Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22209#discussion_r213476910
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,22 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
- stage.status = v1.StageStatus.SKIPPED
- job.skippedStages += stage.info.stageId
- job.skippedTasks += stage.info.numTasks
- it.remove()
- update(stage, now)
+ // Only update the stage if it has not finished already
+ if (v1.StageStatus.ACTIVE.equals(stage.status) ||
--- End diff --
I think that there are still two things here:
- according to code in DAGScheduler#handleTaskCompletion, it seems like
it's possible for a job to be marked finished before all task end events
arrive. The job is marked finished as soon as all partitions are computed, so
if you have e.g. speculative tasks you may miss things by removing the stage
here.
- re-reading the code again, the pool only really needs to be updated in
the pending case, since other cases are handled in `onStageCompleted` already.
This will cause that leak that I mentioned before, though, where missing
events makes things remains in the live lists forever. But that's already a
problem with this code, and we should look at all of those as a separate task.
So I think it's more correct to only do anything for pending stages here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]