Ngone51 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r765389196
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1630,6 +1652,50 @@ private[spark] class DAGScheduler(
}
}
+ private[scheduler] def checkAndScheduleShuffleMergeFinalize(
+ shuffleStage: ShuffleMapStage): Unit = {
+ // Check if a finalize task has already been scheduled. This is to prevent
scenarios
+ // where we don't schedule multiple shuffle merge finalization which can
happen due
+ // stage retry or shufflePushMinRatio is already hit etc.
+ if (shuffleStage.shuffleDep.getFinalizeTask.isEmpty) {
+ // 1. Stage indeterminate and some map outputs are not available -
finalize
+ // immediately without registering shuffle merge results.
+ // 2. Stage determinate and some map outputs are not available - decide
to
+ // register merge results based on map outputs size available and
+ // shuffleMergeWaitMinSizeThreshold.
+ // 3. All shuffle outputs available - decide to register merge results
based
+ // on map outputs size available and shuffleMergeWaitMinSizeThreshold.
+ val totalSize = {
+ lazy val computedTotalSize =
+ mapOutputTracker.getStatistics(shuffleStage.shuffleDep).
+ bytesByPartitionId.filter(_ > 0).sum
+ if (shuffleStage.isAvailable) {
+ computedTotalSize
+ } else {
+ if (shuffleStage.isIndeterminate) {
+ 0L
+ } else {
+ computedTotalSize
+ }
+ }
+ }
+
+ if (totalSize < shuffleMergeWaitMinSizeThreshold) {
+ // Check if we can process map stage completion. If shuffle merge
finalization
+ // is already triggered because push completion ratio was reached
earlier,
+ // we cannot process map stage completion, but have to wait for the
finalization
+ // to finish. This is because it's not straightforward to interrupt the
+ // finalization task and undo what it might have already done.
+ if (scheduleShuffleMergeFinalize(shuffleStage, delay = 0,
+ registerMergeResults = false)) {
Review comment:
`scheduleShuffleMergeFinalize` would also trigger the call of
`handleShuffleMergeFinalized` (scheduleShuffleMergeFinalize -> setFinalizeTask
-> finalizeShuffleMerge -> handleShuffleMergeFinalized)?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -716,7 +728,9 @@ private[spark] class DAGScheduler(
// Mark mapStage as available with shuffle outputs only after
shuffle merge is
// finalized with push based shuffle. If not, subsequent
ShuffleMapStage won't
// read from merged output as the MergeStatuses are not
available.
- if (!mapStage.isAvailable ||
!mapStage.shuffleDep.shuffleMergeFinalized) {
+ if (!mapStage.isAvailable ||
+ (mapStage.shuffleDep.shuffleMergeEnabled &&
Review comment:
This's a duplication condition? `shuffleDep.shuffleMergeFinalized`
already checks it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]