otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r641257596



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2000,6 +2023,147 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+      val timedOut = new AtomicBoolean()
+
+      def increaseAndCheckResponseCount(): Unit = {
+        if (numResponses.incrementAndGet() == numMergers) {
+          logInfo("%s (%s) shuffle merge finalized".format(stage, stage.name))
+          // Since this runs in the netty client thread and is outside of 
DAGScheduler
+          // event loop, we only post ShuffleMergeFinalized event into the 
event queue.
+          // The processing of this event should be done inside the event 
loop, so it
+          // can safely modify scheduler's internal state.
+          eventProcessLoop.post(ShuffleMergeFinalized(stage))
+        }
+      }
+
+      stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+        case (shuffleServiceLoc, index) =>
+          // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+          // TODO: during shuffleMergeFinalizeWaitSec
+          shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+            shuffleServiceLoc.port, shuffleId,
+            new MergeFinalizerListener {
+              override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+                assert(shuffleId == statuses.shuffleId)
+                if (!timedOut.get()) {
+                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+                        convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+                  increaseAndCheckResponseCount()
+                  results(index).set(true)
+                }
+              }
+
+              override def onShuffleMergeFailure(e: Throwable): Unit = {
+                if (!timedOut.get()) {
+                  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+                    s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+                  increaseAndCheckResponseCount()
+                  // Do not fail the future as this would cause dag scheduler 
to prematurely
+                  // give up on waiting for merge results from the remaining 
shuffle services
+                  // if one fails
+                  results(index).set(false)
+                }
+              }
+            })
+      }
+      // DAGScheduler only waits for a limited amount of time for the merge 
results.
+      // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+      // from all shuffle services are received or not.
+      // TODO: SPARK-33701: Instead of waiting for a constant amount of time 
for finalization
+      // TODO: for all the stages, adaptively tune timeout for merge 
finalization
+      try {
+        Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
+      } catch {
+        case _: TimeoutException =>
+          logInfo(s"Timed out on waiting for merge results from all " +
+            s"$numMergers mergers for shuffle $shuffleId")
+          timedOut.set(true)
+          eventProcessLoop.post(ShuffleMergeFinalized(stage))
+      }
+    }
+  }
+
+  private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): 
Unit = {
+    markStageAsFinished(shuffleStage)
+    logInfo("looking for newly runnable stages")
+    logInfo("running: " + runningStages)
+    logInfo("waiting: " + waitingStages)
+    logInfo("failed: " + failedStages)
+
+    // This call to increment the epoch may not be strictly necessary, but it 
is retained
+    // for now in order to minimize the changes in behavior from an earlier 
version of the
+    // code. This existing behavior of always incrementing the epoch following 
any
+    // successful shuffle map stage completion may have benefits by causing 
unneeded
+    // cached map outputs to be cleaned up earlier on executors. In the future 
we can
+    // consider removing this call, but this will require some extra 
investigation.
+    // See https://github.com/apache/spark/pull/17955/files#r117385673 for 
more details.
+    mapOutputTracker.incrementEpoch()
+
+    clearCacheLocs()
+
+    if (!shuffleStage.isAvailable) {
+      // Some tasks had failed; let's resubmit this shuffleStage.
+      // TODO: Lower-level scheduler should also deal with this
+      logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
+        ") because some of its tasks had failed: " +
+        shuffleStage.findMissingPartitions().mkString(", "))
+      submitStage(shuffleStage)
+    } else {
+      markMapStageJobsAsFinished(shuffleStage)
+      submitWaitingChildStages(shuffleStage)
+    }
+  }
+
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+      mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+    }
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
+    // Only update MapOutputTracker metadata if the stage is still active. i.e 
not cancelled.
+    if (runningStages.contains(stage)) {
+      stage.shuffleDep.markShuffleMergeFinalized()
+      processShuffleMapStageCompletion(stage)
+    } else {
+      // TODO: SPARK-35549: Currently merge statuses results which come after 
shuffle merge
+      // TODO: is finalized is not registered.

Review comment:
       Just adding more clarification. 
   The below comment suggests that when the stage is cancelled then 
finalizeShuffleMerge will be cancelled and with that there is no guarantee that 
there is a ShuffleMergeFinalized event will be posted
   ```
    // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled
             // TODO: during shuffleMergeFinalizeWaitSec
   ``` 
   So, unregistering of all the merge results should ideally happen when the 
stage is cancelled. 
   Since that will be handled with another jira, I think it is better to handle 
the unregisteration of all merge results with that jira as well.
   
   In case you still want to keep it here, then please add a comment saying 
something like: this is targeting a stage cancellation while the merge 
finalization is not completed. Also, add the explanation that how it will 
interfere with the new stage after cancellation and the TODO should point to 
`SPARK-35536` where we will reconsider if we need to unregister here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to