venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r749635349
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
}
/**
- * Schedules shuffle merge finalize.
+ *
+ * Schedules shuffle merge finalization.
+ *
+ * @param stage the stage to finalize shuffle merge
+ * @param delay how long to wait before finalizing shuffle merge
+ * @param registerMergeResults whether to wait for merge results before
scheduling the next stage
+ * @return whether the caller is able to schedule a finalize task
*/
- private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage):
Unit = {
- // TODO: SPARK-33701: Instead of waiting for a constant amount of time for
finalization
- // TODO: for all the stages, adaptively tune timeout for merge finalization
- logInfo(("%s (%s) scheduled for finalizing" +
- " shuffle merge in %s s").format(stage, stage.name,
shuffleMergeFinalizeWaitSec))
- shuffleMergeFinalizeScheduler.schedule(
- new Runnable {
- override def run(): Unit = finalizeShuffleMerge(stage)
- },
- shuffleMergeFinalizeWaitSec,
- TimeUnit.SECONDS
- )
+ private[scheduler] def scheduleShuffleMergeFinalize(
+ stage: ShuffleMapStage,
+ delay: Long,
+ registerMergeResults: Boolean = true): Boolean = {
+ val shuffleDep = stage.shuffleDep
+ val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+ scheduledTask match {
+ case Some(task) =>
+ // If we find an already scheduled task, check if the task has been
triggered yet.
+ // If it's already triggered, do nothing. Otherwise, cancel it and
schedule a new
+ // one for immediate execution. Note that we should get here only when
+ // handleShufflePushCompleted schedules a finalize task after the
shuffle map stage
+ // completed earlier and scheduled a task with default delay.
+ if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+ task.cancel(false)
+ // The current task should be coming from
handleShufflePushCompleted, thus the
+ // delay should be 0 and registerMergeResults should be true.
+ assert(delay == 0 && registerMergeResults)
Review comment:
Yeah since we are scheduling the finalize only if there is no scheduled
task when the stage completes (one that checks if `totalShuffleSize <
shuffleMergeWaitMinSizeThreshold`. We can move this assertion outside. Will
make the change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]