venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r629845232



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1271,21 +1302,28 @@ private[spark] class DAGScheduler(
    * locations for block push/merge by getting the historical locations of 
past executors.
    */
   private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
-    // TODO(SPARK-32920) Handle stage reuse/retry cases separately as without 
finalize
-    // TODO changes we cannot disable shuffle merge for the retry/reuse cases
-    val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-
-    if (mergerLocs.nonEmpty) {
-      stage.shuffleDep.setMergerLocs(mergerLocs)
-      logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
-        s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
-      logDebug("List of shuffle push merger locations " +
-        s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-    } else {
-      logInfo("No available merger locations." +
-        s" Push-based shuffle disabled for $stage (${stage.name})")
+    if (stage.shuffleDep.shuffleMergeEnabled && 
!stage.shuffleDep.shuffleMergeFinalized
+      && stage.shuffleDep.getMergerLocs.isEmpty) {
+      val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+        stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+      if (mergerLocs.nonEmpty) {
+        stage.shuffleDep.setMergerLocs(mergerLocs)
+        logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
+          s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
+
+        logDebug("List of shuffle push merger locations " +
+          s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+      } else {
+        stage.shuffleDep.setShuffleMergeEnabled(false)
+        logInfo("Push-based shuffle disabled for $stage (${stage.name})")
+      }
+    } else if (stage.shuffleDep.shuffleMergeFinalized) {
+      // Disable Shuffle merge for the retry/reuse of the same shuffle 
dependency if it has
+      // already been merge finalized. If the shuffle dependency was 
previously assigned merger
+      // locations but the corresponding shuffle map stage did not complete 
successfully, we
+      // would still enable push for its retry.

Review comment:
       Yes, we are disabling merge in those case since it is already finalized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to