venkata91 commented on a change in pull request #30691: URL: https://github.com/apache/spark/pull/30691#discussion_r641707976
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -2000,6 +2023,147 @@ private[spark] class DAGScheduler( } } + /** + * Schedules shuffle merge finalize. + */ + private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { + logInfo(("%s (%s) scheduled for finalizing" + + " shuffle merge in %s s").format(stage, stage.name, shuffleMergeFinalizeWaitSec)) + shuffleMergeFinalizeScheduler.schedule( + new Runnable { + override def run(): Unit = finalizeShuffleMerge(stage) + }, + shuffleMergeFinalizeWaitSec, + TimeUnit.SECONDS + ) + } + + /** + * DAGScheduler notifies all the remote shuffle services chosen to serve shuffle merge request for + * the given shuffle map stage to finalize the shuffle merge process for this shuffle. This is + * invoked in a separate thread to reduce the impact on the DAGScheduler main thread, as the + * scheduler might need to talk to 1000s of shuffle services to finalize shuffle merge. + */ + private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = { + logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name)) + externalShuffleClient.foreach { shuffleClient => + val shuffleId = stage.shuffleDep.shuffleId + val numMergers = stage.shuffleDep.getMergerLocs.length + val numResponses = new AtomicInteger() + val results = (0 until numMergers).map(_ => SettableFuture.create[Boolean]()) + val timedOut = new AtomicBoolean() + + def increaseAndCheckResponseCount(): Unit = { + if (numResponses.incrementAndGet() == numMergers) { + logInfo("%s (%s) shuffle merge finalized".format(stage, stage.name)) + // Since this runs in the netty client thread and is outside of DAGScheduler + // event loop, we only post ShuffleMergeFinalized event into the event queue. + // The processing of this event should be done inside the event loop, so it + // can safely modify scheduler's internal state. + eventProcessLoop.post(ShuffleMergeFinalized(stage)) + } + } + + stage.shuffleDep.getMergerLocs.zipWithIndex.foreach { + case (shuffleServiceLoc, index) => + // Sends async request to shuffle service to finalize shuffle merge on that host + // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled + // TODO: during shuffleMergeFinalizeWaitSec + shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host, + shuffleServiceLoc.port, shuffleId, + new MergeFinalizerListener { + override def onShuffleMergeSuccess(statuses: MergeStatuses): Unit = { + assert(shuffleId == statuses.shuffleId) + if (!timedOut.get()) { + eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus. + convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc))) + increaseAndCheckResponseCount() + results(index).set(true) + } + } + + override def onShuffleMergeFailure(e: Throwable): Unit = { + if (!timedOut.get()) { + logWarning(s"Exception encountered when trying to finalize shuffle " + + s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e) + increaseAndCheckResponseCount() + // Do not fail the future as this would cause dag scheduler to prematurely + // give up on waiting for merge results from the remaining shuffle services + // if one fails + results(index).set(false) + } + } + }) + } + // DAGScheduler only waits for a limited amount of time for the merge results. + // It will attempt to submit the next stage(s) irrespective of whether merge results + // from all shuffle services are received or not. + // TODO: SPARK-33701: Instead of waiting for a constant amount of time for finalization + // TODO: for all the stages, adaptively tune timeout for merge finalization + try { + Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, TimeUnit.SECONDS) + } catch { + case _: TimeoutException => + logInfo(s"Timed out on waiting for merge results from all " + + s"$numMergers mergers for shuffle $shuffleId") + timedOut.set(true) + eventProcessLoop.post(ShuffleMergeFinalized(stage)) + } + } + } + + private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): Unit = { + markStageAsFinished(shuffleStage) + logInfo("looking for newly runnable stages") + logInfo("running: " + runningStages) + logInfo("waiting: " + waitingStages) + logInfo("failed: " + failedStages) + + // This call to increment the epoch may not be strictly necessary, but it is retained + // for now in order to minimize the changes in behavior from an earlier version of the + // code. This existing behavior of always incrementing the epoch following any + // successful shuffle map stage completion may have benefits by causing unneeded + // cached map outputs to be cleaned up earlier on executors. In the future we can + // consider removing this call, but this will require some extra investigation. + // See https://github.com/apache/spark/pull/17955/files#r117385673 for more details. + mapOutputTracker.incrementEpoch() + + clearCacheLocs() + + if (!shuffleStage.isAvailable) { + // Some tasks had failed; let's resubmit this shuffleStage. + // TODO: Lower-level scheduler should also deal with this + logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + + ") because some of its tasks had failed: " + + shuffleStage.findMissingPartitions().mkString(", ")) + submitStage(shuffleStage) + } else { + markMapStageJobsAsFinished(shuffleStage) + submitWaitingChildStages(shuffleStage) + } + } + + private[scheduler] def handleRegisterMergeStatuses( + stage: ShuffleMapStage, + mergeStatuses: Seq[(Int, MergeStatus)]): Unit = { + // Register merge statuses if the stage is still running and shuffle merge is not finalized yet. + if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) { + mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, mergeStatuses) + } + } + + private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): Unit = { + // Only update MapOutputTracker metadata if the stage is still active. i.e not cancelled. + if (runningStages.contains(stage)) { + stage.shuffleDep.markShuffleMergeFinalized() + processShuffleMapStageCompletion(stage) + } else { + // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge + // TODO: is finalized is not registered. Review comment: I think the TODO comment I have is in wrong place. I have fixed that and also commented on why we are unregistering all the merge results. Stage cancellation issue on the client side for shuffle merge is handled as part of this JIRA. Stage cancellation issue on server side would be handled as part of SPARK-35536. [SPARK-35536](https://issues.apache.org/jira/browse/SPARK-35536) is a different issue where the stage is cancelled during the `shuffleMergeFinalizeWaitSec` time where we are making requests to individual shuffle services to finalizeShuffleMerge. This is to cancel that request. Also we might need to request all the shuffle services to clean up the merged blocks. [SPARK-35549](https://issues.apache.org/jira/browse/SPARK-35549) is trying to address all the other cases like the one mentioned there in the JIRA - `executor/node loss causing re-computation due to fetch failure and if the merge statuses gets registered very late then that can cause inconsistencies.` which is why it is safer to not register merge results which comes after shuffle merge is finalized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org