mridulm commented on code in PR #39011:
URL: https://github.com/apache/spark/pull/39011#discussion_r1045132885


##########
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala:
##########
@@ -51,6 +52,7 @@ import org.apache.spark.rdd.RDD
  *                  at the same time for a barrier stage.
  */
 private[spark] class ShuffleMapTask(
+    @transient val shuffleId: Int,

Review Comment:
   Do we need to add this ? We can query `DAGScheduler` given a stageId to find 
the shuffleId (from `ShuffleMapStage.shuffleDep,shuffleId`) - this can be done 
once, at `TaskSetManager` creation, and persisted as a private field, to make 
sure it is minimal cost.
   
   Thoughts ?



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1128,6 +1128,17 @@ private[spark] class MapOutputTrackerMaster(
     }
   }
 
+  /**
+   * Get map output location by (shuffleId, mapId)
+   */
+  def getMapOutputLocation(shuffleId: Int, mapId: Long): 
Option[BlockManagerId] = {
+    shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
+      shuffleStatus.withMapStatuses { mapStatues =>
+        mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
+      }

Review Comment:
   nit: In case of `SHUFFLE_USE_OLD_FETCH_PROTOCOL`, we can possibly simply 
lookup the index into the array, right ? Instead of needing to scan it ?
   Given it is turned off by default, I am fine with leaving it as is though ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to