mridulm commented on a change in pull request #30691: URL: https://github.com/apache/spark/pull/30691#discussion_r629813442
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -2004,6 +2020,131 @@ private[spark] class DAGScheduler( } } + /** + * Schedules shuffle merge finalize. + */ + private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { + logInfo(("%s (%s) scheduled for finalizing" + + " shuffle merge in %s s").format(stage, stage.name, shuffleMergeFinalizeWaitSec)) + shuffleMergeFinalizeScheduler.schedule( + new Runnable { + override def run(): Unit = finalizeShuffleMerge(stage) + }, + shuffleMergeFinalizeWaitSec, + TimeUnit.SECONDS + ) + } + + /** + * DAGScheduler notifies all the remote shuffle services chosen to serve shuffle merge request for + * the given shuffle map stage to finalize the shuffle merge process for this shuffle. This is + * invoked in a separate thread to reduce the impact on the DAGScheduler main thread, as the + * scheduler might need to talk to 1000s of shuffle services to finalize shuffle merge. + */ + private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = { + logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name)) Review comment: What happens if the stage was cancelled during `shuffleMergeFinalizeWaitSec` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org