venkata91 commented on a change in pull request #30691: URL: https://github.com/apache/spark/pull/30691#discussion_r629845232
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -1271,21 +1302,28 @@ private[spark] class DAGScheduler( * locations for block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { - // TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize - // TODO changes we cannot disable shuffle merge for the retry/reuse cases - val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( - stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - - if (mergerLocs.nonEmpty) { - stage.shuffleDep.setMergerLocs(mergerLocs) - logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" + - s" ${stage.shuffleDep.getMergerLocs.size} merger locations") - - logDebug("List of shuffle push merger locations " + - s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - } else { - logInfo("No available merger locations." + - s" Push-based shuffle disabled for $stage (${stage.name})") + if (stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized + && stage.shuffleDep.getMergerLocs.isEmpty) { + val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" + + s" ${stage.shuffleDep.getMergerLocs.size} merger locations") + + logDebug("List of shuffle push merger locations " + + s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + } else { + stage.shuffleDep.setShuffleMergeEnabled(false) + logInfo("Push-based shuffle disabled for $stage (${stage.name})") + } + } else if (stage.shuffleDep.shuffleMergeFinalized) { + // Disable Shuffle merge for the retry/reuse of the same shuffle dependency if it has + // already been merge finalized. If the shuffle dependency was previously assigned merger + // locations but the corresponding shuffle map stage did not complete successfully, we + // would still enable push for its retry. Review comment: Yes, we are disabling merge in those case since it is already finalized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org