mridulm commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r748806437



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1630,6 +1652,53 @@ private[spark] class DAGScheduler(
     }
   }
 
+  private[scheduler] def checkAndScheduleShuffleMergeFinalize(
+      shuffleStage: ShuffleMapStage): Unit = {
+    // Check if a finalize task has already been scheduled. This is to prevent 
the
+    // following scenario: Stage A attempt 0 fails and gets retried. Stage A 
attempt 1
+    // succeeded, triggering the scheduling of shuffle merge finalization. 
However,
+    // tasks from Stage A attempt 0 might still be running and sending task 
completion
+    // events to DAGScheduler. This check prevents multiple attempts to 
schedule merge
+    // finalization get triggered due to this.

Review comment:
       Can you double check on the comment here ?
   Two things:
   
   a) Stage A attempt 0's task will get reset when attempt 1 is launched (if 
non-determinate), or wont have finalize task (if determinate).
   So the case described in comment should not happen ?
   
   b) The check does handle case where we decide to finalize earlier - in case 
`shufflePushMinRatio < 1.0`.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2184,13 +2328,27 @@ private[spark] class DAGScheduler(
     if (runningStages.contains(stage)) {
       stage.shuffleDep.markShuffleMergeFinalized()
       processShuffleMapStageCompletion(stage)
+      logInfo(s"$stage (${stage.name}) shuffle merge is finalized")
     } else {
       // Unregister all merge results if the stage is currently not
       // active (i.e. the stage is cancelled)
       mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)
     }
   }
 
+  private[scheduler] def handleShufflePushCompleted(shuffleId: Int, mapIndex: 
Int): Unit = {
+    shuffleIdToMapStage.get(shuffleId) match {
+      case Some(mapStage) =>
+        val shuffleDep = mapStage.shuffleDep
+        if (!shuffleDep.shuffleMergeFinalized &&
+          shuffleDep.incPushCompleted(mapIndex) * 1.0 / 
shuffleDep.rdd.partitions.length

Review comment:
       nit: `.toDouble` instead of `* 1.0`

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
   }
 
   /**
-   * Schedules shuffle merge finalize.
+   *
+   * Schedules shuffle merge finalization.
+   *
+   * @param stage the stage to finalize shuffle merge
+   * @param delay how long to wait before finalizing shuffle merge
+   * @param registerMergeResults whether to wait for merge results before 
scheduling the next stage
+   * @return whether the caller is able to schedule a finalize task
    */
-  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
-    // TODO: SPARK-33701: Instead of waiting for a constant amount of time for 
finalization
-    // TODO: for all the stages, adaptively tune timeout for merge finalization
-    logInfo(("%s (%s) scheduled for finalizing" +
-      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
-    shuffleMergeFinalizeScheduler.schedule(
-      new Runnable {
-        override def run(): Unit = finalizeShuffleMerge(stage)
-      },
-      shuffleMergeFinalizeWaitSec,
-      TimeUnit.SECONDS
-    )
+  private[scheduler] def scheduleShuffleMergeFinalize(
+      stage: ShuffleMapStage,
+      delay: Long,
+      registerMergeResults: Boolean = true): Boolean = {
+    val shuffleDep = stage.shuffleDep
+    val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+    scheduledTask match {
+      case Some(task) =>
+        // If we find an already scheduled task, check if the task has been 
triggered yet.
+        // If it's already triggered, do nothing. Otherwise, cancel it and 
schedule a new
+        // one for immediate execution. Note that we should get here only when
+        // handleShufflePushCompleted schedules a finalize task after the 
shuffle map stage
+        // completed earlier and scheduled a task with default delay.
+        if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+          task.cancel(false)
+          // The current task should be coming from 
handleShufflePushCompleted, thus the
+          // delay should be 0 and registerMergeResults should be true.
+          assert(delay == 0 && registerMergeResults)

Review comment:
       Shouldn't this not be the case outside of the `if` itself ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to