Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/3009#issuecomment-63411844
Implementing an intuitive progress bar is turning out to be a lot more
complicated than I originally anticipated.
## Correctness specification for progress bars
Ideally, a job progress bar would have the following behavior:
1. Once a job has completed successfully, its progress bar should never
change.
2. The limit of a job's progress bar should never change; the number of
stages / tasks should be fixed.
3. If a job has successfully completed, its task and stage progress bars
should be at 100%.
4. If a job's task or stage progress bar is at 100%, then the job has
completed successfully.
5. If a task or stage fails, then the progress bar should go backwards to
an earlier value so that it accurately reflects the amount of remaining work.
## Current anomalies
Unfortunately, the current progress bar suffers from several anomalies:
- At the start of a job, before any tasks have been executed, the progress
bar's "total number of tasks" may be zero / smaller than the actual number of
tasks that will be computed.
- Resubmitted stages cause the total number of tasks to increase.
- Recomputed tasks may cause the number of completed tasks to be greater
than the total number of tasks.
- The progress bar's value never decreases, so it may not accurately
reflect the state of jobs where stages are recomputed.
## Tricky edge-cases
### When starting a job, we do not know the actual number of stages that
will be (re)computed
Let's say that we want to solve the "total number of tasks increases"
anomaly. One approach, suggested upthread, is to modify `DAGScheduler` compute
the total number of tasks that the job will execute and to include this count
in the `SparkListenerJobStart` event that it sends to the UI. The scheduler
maintains a `jobIdToStageIds` map which tracks a job's stage dependencies.
From the code, it's not immediately clear whether this holds _all_ (transitive)
stage dependencies or only those stages that need to be computed for this job.
This distinction matters due to shared / re-used stages:
If two jobs share a ShuffleMapStage, then it's possible that the second job
can re-use the earlier version of a stage. If the `jobIdToStageIds` includes
this stage, then we risk overcounting the number of tasks because not every
stage in `jobIdToStageIds` will necessarily be run when executing the job.
Now, let's assume the opposite: even if `jobIdToStageIds` only tracks the
missing stages that must be computed to compute the job rather than all stage
dependencies, it's still possible that a FetchFailure could trigger a
recomputation of one of the stage dependencies that wasn't included in
`jobIdToStageIds`. (**Note**: I haven't figured out which of these two cases
is the actual truth, but I think this is a convincing argument that we'll have
problems in either case).
In a nutshell, I think that **we may have to abandon the idea of a fixed
goalpost** because the scheduler itself doesn't definitively know the total
amount of work that needs to be done to compute a job at that job's start. To
me, it seems more intuitive to have the limit of the progress bar increase once
we discover that there is more work to be done. I think that we should keep
the current behavior of updating the progress bar's limit once
`SparkListenerStageSubmitted` events are received by the UI.
### Stage losses / recomputations should not always cause jobs' progress to
decrease
Multiple active jobs may depend on a stage. Let's say that we want to
cause the stage progress indicator to decrease so that it accurately reflects
the number of remaining stages. As a naive approach, we could use
`JobProgressListener`'s `stageIdToActiveJobIds` to determine which jobs
dependend on a failed or recomputed stage and update those jobs' progress bars.
Unfortunately, this may be subtly wrong if there are multiple active jobs that
share the same failed stage. It's possible that one job depended on that stage
but has progressed far past it, so the failure / loss of the stage's output
doesn't immediately impact that job, while some other job's progress is
actually impacted by the stage failure. Essentially, it's not always the case
that a stage failure requires recomputation for all active jobs that depend on
that stage.
Similarly, if we wanted to have task progress bars decrease, then we'd have
to be careful to determine which jobs' progress is impacted by a task
recomputation.
## Plan
Given this, my plan is:
- Convince myself that the progress bar can eventually reach 100% because
every stage that hits `SparkListenerJobStart` will eventually run / complete
(i.e. that `SparkListenerJobStart` won't include stages that won't be computed).
- Implement a progress bar that ignores task / stage recomputations; in
these cases, the progress bar will appear to stall during failures /
recomputations.
I think that this is the best that we can do for now (without significant
refactorings / scheduler changes).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]