venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r768081977
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1630,6 +1652,50 @@ private[spark] class DAGScheduler(
}
}
+ private[scheduler] def checkAndScheduleShuffleMergeFinalize(
+ shuffleStage: ShuffleMapStage): Unit = {
+ // Check if a finalize task has already been scheduled. This is to prevent
scenarios
+ // where we don't schedule multiple shuffle merge finalization which can
happen due
+ // stage retry or shufflePushMinRatio is already hit etc.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // 1. Stage indeterminate and some map outputs are not available -
finalize
+ // immediately without registering shuffle merge results.
+ // 2. Stage determinate and some map outputs are not available - decide
to
+ // register merge results based on map outputs size available and
+ // shuffleMergeWaitMinSizeThreshold.
+ // 3. All shuffle outputs available - decide to register merge results
based
+ // on map outputs size available and shuffleMergeWaitMinSizeThreshold.
+ val totalSize = {
+ lazy val computedTotalSize =
+ mapOutputTracker.getStatistics(shuffleStage.shuffleDep).
+ bytesByPartitionId.filter(_ > 0).sum
+ if (shuffleStage.isAvailable) {
+ computedTotalSize
+ } else {
+ if (shuffleStage.isIndeterminate) {
+ 0L
+ } else {
+ computedTotalSize
+ }
+ }
+ }
+
+ if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+ // Check if we can process map stage completion. If shuffle merge
finalization
+ // is already triggered because push completion ratio was reached
earlier,
+ // we cannot process map stage completion, but have to wait for the
finalization
+ // to finish. This is because it's not straightforward to interrupt the
+ // finalization task and undo what it might have already done.
+ if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+ registerMergeResults = false)) {
Review comment:
Yes that is true but the advantage here is since the size of shuffle
data generated is < `shuffleMergeWaitMinSizeThreshold` we want to immediately
mark shuffle merge finalized so that the next stage would proceed immediately
without waiting for all the shuffle mergers (external shuffle services) to
finalize the shuffle merge. This is just an optimization to reduce the overall
delay in proceeding with the next stage. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]