mridulm commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r742415348
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1716,7 +1739,31 @@ private[spark] class DAGScheduler(
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
- scheduleShuffleMergeFinalize(shuffleStage)
+ // Check if a finalize task has already been scheduled. This
is to prevent the
+ // following scenario: Stage A attempt 0 fails and gets
retried. Stage A attempt 1
+ // succeeded, triggering the scheduling of shuffle merge
finalization. However,
+ // tasks from Stage A attempt 0 might still be running and
sending task completion
+ // events to DAGScheduler. This check prevents multiple
attempts to schedule merge
+ // finalization get triggered due to this.
Review comment:
To elaborate on the above, there are multiple code flows here, which
need to be looked at.
(I am assuming the issues I detailed earlier are resolved).
We need to document this so that it is clearer:
#### Case 1: sufficient task pushes finish (determined by
shufflePushMinRatio)
1. handleShufflePushCompleted will schedule finalization, with `delay = 0`.
2. Every subsequent task push completion will invoke
`scheduleShuffleMergeFinalize` (if `shufflePushMinRatio` < `1.0`).
* If stage has not completed, the `task.getDelay > 0` prevents finalize
task cancel/reschedule.
* If stage has completed, it will trigger finalize task cancel and
reschedule with `delay = 0` (initial schedule happens via case 3 below)
#### Case 2: stage completes, total size < shuffleMergeWaitMinSizeThreshold
Only if there is no finalize task already scheduled.
1. scheduleShuffleMergeFinalize is invoked with delay = 0
2. As finalize task does not exist (precondition), we always schedule
`finalizeShuffleMerge` with `registerMergeResults` = `false` - essentially we
ignore all merge output.
#### Case 3: stage completes, total size >= shuffleMergeWaitMinSizeThreshold
Only if there is no finalize task already scheduled.
* We schedule scheduleShuffleMergeFinalize with delay =
`shuffleMergeFinalizeWaitSec`.
There are couple of paths here.
##### Case 3.1: If shufflePushMinRatio == 1.0 (default)
* If all tasks finish push before `shuffleMergeFinalizeWaitSec` expires, it
is equivalent to Case 1 (with stage having already completed).
* We schedule execution of `finalizeShuffleMerge` to run with `delay = 0`
after cancelling the finalize task (which was scheduled for delay =
`shuffleMergeFinalizeWaitSec`).
* If delay expires, `finalizeShuffleMerge` is invoked in the finalize
scheduled task.
* Subsequent push completions will get ignored (after the bug is fixed).
From then on, both follow the same code path.
##### Case 3.2: If shufflePushMinRatio < 1.0
This is a variant of the case above, except only a subset of tasks need to
complete pushing merged output.
### Issues to discuss.
Based on the above, we have the following paths which are diverging
depending on event ordering.
* Particularly when `shufflePushMinRatio` < `1.0`, if we hit 'Case 1' before
stage completion, then 'Case 2' does not run.
* There is a behavior change in how merge is handled depending on how
task/push completion events get ordered.
* If we hit 'Case 1' and stage is cancelled/failed (with a subsequent
resubmission), Case 2 and Case 3 do not get executed for subsequent stage
attempts.
* Also, when `finalizeShuffleMerge` gets invoked in future task, we are
finalizing for a stage attempt which is no longer running/is valid.
* This is because even though we schedule with delay = 0, there can still
be nontrivial delay before actual execution of the finalize task - particularly
when the pool has other finalize tasks running.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]