mridulm commented on a change in pull request #35719:
URL: https://github.com/apache/spark/pull/35719#discussion_r818966635



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1022,101 +1022,105 @@ private[spark] class MapOutputTrackerMaster(
       if (preferredLoc.nonEmpty) {
         preferredLoc
       } else {
-        if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
-          dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
-          val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
-            dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
-          if (blockManagerIds.nonEmpty) {
-            blockManagerIds.get.map(_.host)
-          } else {
-            Nil
-          }
-        } else {
-          Nil
-        }
+        getLocationsWithLargestOutputs(
+          dep,
+          0,
+          dep.rdd.getNumPartitions,
+          partitionId,
+          partitionId + 1,
+          REDUCER_PREF_LOCS_FRACTION).map(_.host)
       }
     } else {
       Nil
     }
   }
 
   /**
-   * Return a list of locations that each have fraction of map output greater 
than the specified
-   * threshold.
+   * Return the preferred hosts on which to run the given map output partition 
in a given shuffle,
+   * i.e. the nodes that the most outputs for that partition are on.
    *
-   * @param shuffleId id of the shuffle
-   * @param reducerId id of the reduce task
-   * @param numReducers total number of reducers in the shuffle
-   * @param fractionThreshold fraction of total map output size that a 
location must have
-   *                          for it to be considered large.
+   * @param dep shuffle dependency object
+   * @param startMapIndex the start map index (inclusive)
+   * @param endMapIndex the end map index (exclusive)
+   * @param startReducerIndex the start reducer index (inclusive)
+   * @param endReducerIndex the end reducer index (exclusive)
+   * @return a sequence of locations where task runs.
    */
-  def getLocationsWithLargestOutputs(
-      shuffleId: Int,
-      reducerId: Int,
-      numReducers: Int,
-      fractionThreshold: Double)
-    : Option[Array[BlockManagerId]] = {
-
-    val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
-    if (shuffleStatus != null) {
-      shuffleStatus.withMapStatuses { statuses =>
-        if (statuses.nonEmpty) {
-          // HashMap to add up sizes of all blocks at the same location
-          val locs = new HashMap[BlockManagerId, Long]
-          var totalOutputSize = 0L
-          var mapIdx = 0
-          while (mapIdx < statuses.length) {
-            val status = statuses(mapIdx)
-            // status may be null here if we are called between 
registerShuffle, which creates an
-            // array with null entries for each output, and 
registerMapOutputs, which populates it
-            // with valid status entries. This is possible if one thread 
schedules a job which
-            // depends on an RDD which is currently being computed by another 
thread.
-            if (status != null) {
-              val blockSize = status.getSizeForBlock(reducerId)
-              if (blockSize > 0) {
-                locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
-                totalOutputSize += blockSize
-              }
-            }
-            mapIdx = mapIdx + 1
-          }
-          val topLocs = locs.filter { case (loc, size) =>
-            size.toDouble / totalOutputSize >= fractionThreshold
-          }
-          // Return if we have any locations which satisfy the required 
threshold
-          if (topLocs.nonEmpty) {
-            return Some(topLocs.keys.toArray)
-          }
-        }
-      }
-    }
-    None
+  def getPreferredLocationsForShuffle(
+      dep: ShuffleDependency[_, _, _],
+      startMapIndex: Int,
+      endMapIndex: Int,
+      startReducerIndex: Int,
+      endReducerIndex: Int): Seq[String] = {
+    getLocationsWithLargestOutputs(
+      dep,
+      startMapIndex,
+      endMapIndex,
+      startReducerIndex,
+      endReducerIndex,
+      REDUCER_PREF_LOCS_FRACTION).map(_.host)
   }
 
   /**
-   * Return the locations where the Mappers ran. The locations each includes 
both a host and an
-   * executor id on that host.
+   * If shuffle locality is enabled, return a list of locations that each have 
fraction of
+   * map output greater than the specified threshold.
    *
    * @param dep shuffle dependency object
-   * @param startMapIndex the start map index
+   * @param startMapIndex the start map index (inclusive)
    * @param endMapIndex the end map index (exclusive)
-   * @return a sequence of locations where task runs.
+   * @param startReducerIndex the start reducer index (inclusive)
+   * @param endReducerIndex the end reducer index (exclusive)
+   * @param fractionThreshold fraction of total map output size that a 
location must have
+   *                          for it to be considered large.
    */
-  def getMapLocation(
+  def getLocationsWithLargestOutputs(
       dep: ShuffleDependency[_, _, _],
       startMapIndex: Int,
-      endMapIndex: Int): Seq[String] =
-  {
-    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
-      shuffleStatus.withMapStatuses { statuses =>
-        if (startMapIndex < endMapIndex &&
-          (startMapIndex >= 0 && endMapIndex <= statuses.length)) {
-          val statusesPicked = statuses.slice(startMapIndex, 
endMapIndex).filter(_ != null)
-          statusesPicked.map(_.location.host).toSeq
-        } else {
-          Nil
+      endMapIndex: Int,
+      startReducerIndex: Int,
+      endReducerIndex: Int,
+      fractionThreshold: Double): Seq[BlockManagerId] = {
+    if (shuffleLocalityEnabled && dep.rdd.getNumPartitions < 
SHUFFLE_PREF_MAP_THRESHOLD &&
+      dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+      val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
+      if (shuffleStatus != null) {
+        shuffleStatus.withMapStatuses { statuses =>
+          if (statuses.nonEmpty && startMapIndex < endMapIndex &&
+            startMapIndex >= 0 && endMapIndex <= statuses.length) {
+            // HashMap to add up sizes of all blocks at the same location
+            val locs = new HashMap[BlockManagerId, Long]
+            var totalOutputSize = 0L
+            var mapIdx = startMapIndex

Review comment:
       nit: 
   For the common case of `startReducerIndex + 1` == `endReducerIndex`, we can 
simply return `statuses.slice(startMapIndex, endMapIndex).filter(_ != 
null).map(_.location.host).toSeq`

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1022,101 +1022,105 @@ private[spark] class MapOutputTrackerMaster(
       if (preferredLoc.nonEmpty) {
         preferredLoc
       } else {
-        if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
-          dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
-          val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
-            dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
-          if (blockManagerIds.nonEmpty) {
-            blockManagerIds.get.map(_.host)
-          } else {
-            Nil
-          }
-        } else {
-          Nil
-        }
+        getLocationsWithLargestOutputs(
+          dep,
+          0,
+          dep.rdd.getNumPartitions,
+          partitionId,
+          partitionId + 1,
+          REDUCER_PREF_LOCS_FRACTION).map(_.host)
       }
     } else {
       Nil
     }
   }
 
   /**
-   * Return a list of locations that each have fraction of map output greater 
than the specified
-   * threshold.
+   * Return the preferred hosts on which to run the given map output partition 
in a given shuffle,
+   * i.e. the nodes that the most outputs for that partition are on.
    *
-   * @param shuffleId id of the shuffle
-   * @param reducerId id of the reduce task
-   * @param numReducers total number of reducers in the shuffle
-   * @param fractionThreshold fraction of total map output size that a 
location must have
-   *                          for it to be considered large.
+   * @param dep shuffle dependency object
+   * @param startMapIndex the start map index (inclusive)
+   * @param endMapIndex the end map index (exclusive)
+   * @param startReducerIndex the start reducer index (inclusive)
+   * @param endReducerIndex the end reducer index (exclusive)
+   * @return a sequence of locations where task runs.
    */
-  def getLocationsWithLargestOutputs(
-      shuffleId: Int,
-      reducerId: Int,
-      numReducers: Int,
-      fractionThreshold: Double)
-    : Option[Array[BlockManagerId]] = {
-
-    val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
-    if (shuffleStatus != null) {
-      shuffleStatus.withMapStatuses { statuses =>
-        if (statuses.nonEmpty) {
-          // HashMap to add up sizes of all blocks at the same location
-          val locs = new HashMap[BlockManagerId, Long]
-          var totalOutputSize = 0L
-          var mapIdx = 0
-          while (mapIdx < statuses.length) {
-            val status = statuses(mapIdx)
-            // status may be null here if we are called between 
registerShuffle, which creates an
-            // array with null entries for each output, and 
registerMapOutputs, which populates it
-            // with valid status entries. This is possible if one thread 
schedules a job which
-            // depends on an RDD which is currently being computed by another 
thread.
-            if (status != null) {
-              val blockSize = status.getSizeForBlock(reducerId)
-              if (blockSize > 0) {
-                locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
-                totalOutputSize += blockSize
-              }
-            }
-            mapIdx = mapIdx + 1
-          }
-          val topLocs = locs.filter { case (loc, size) =>
-            size.toDouble / totalOutputSize >= fractionThreshold
-          }
-          // Return if we have any locations which satisfy the required 
threshold
-          if (topLocs.nonEmpty) {
-            return Some(topLocs.keys.toArray)
-          }
-        }
-      }
-    }
-    None
+  def getPreferredLocationsForShuffle(
+      dep: ShuffleDependency[_, _, _],
+      startMapIndex: Int,
+      endMapIndex: Int,
+      startReducerIndex: Int,
+      endReducerIndex: Int): Seq[String] = {
+    getLocationsWithLargestOutputs(
+      dep,
+      startMapIndex,
+      endMapIndex,
+      startReducerIndex,
+      endReducerIndex,
+      REDUCER_PREF_LOCS_FRACTION).map(_.host)
   }
 
   /**
-   * Return the locations where the Mappers ran. The locations each includes 
both a host and an
-   * executor id on that host.
+   * If shuffle locality is enabled, return a list of locations that each have 
fraction of
+   * map output greater than the specified threshold.
    *
    * @param dep shuffle dependency object
-   * @param startMapIndex the start map index
+   * @param startMapIndex the start map index (inclusive)
    * @param endMapIndex the end map index (exclusive)
-   * @return a sequence of locations where task runs.
+   * @param startReducerIndex the start reducer index (inclusive)
+   * @param endReducerIndex the end reducer index (exclusive)
+   * @param fractionThreshold fraction of total map output size that a 
location must have
+   *                          for it to be considered large.
    */
-  def getMapLocation(
+  def getLocationsWithLargestOutputs(
       dep: ShuffleDependency[_, _, _],
       startMapIndex: Int,
-      endMapIndex: Int): Seq[String] =
-  {
-    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
-      shuffleStatus.withMapStatuses { statuses =>
-        if (startMapIndex < endMapIndex &&
-          (startMapIndex >= 0 && endMapIndex <= statuses.length)) {
-          val statusesPicked = statuses.slice(startMapIndex, 
endMapIndex).filter(_ != null)
-          statusesPicked.map(_.location.host).toSeq
-        } else {
-          Nil
+      endMapIndex: Int,
+      startReducerIndex: Int,
+      endReducerIndex: Int,
+      fractionThreshold: Double): Seq[BlockManagerId] = {
+    if (shuffleLocalityEnabled && dep.rdd.getNumPartitions < 
SHUFFLE_PREF_MAP_THRESHOLD &&
+      dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+      val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
+      if (shuffleStatus != null) {
+        shuffleStatus.withMapStatuses { statuses =>
+          if (statuses.nonEmpty && startMapIndex < endMapIndex &&
+            startMapIndex >= 0 && endMapIndex <= statuses.length) {

Review comment:
       The additional checks on `startMapIndex` and `endMapIndex` can be done 
outside along with the outer `shuffleLocalityEnabled` if condition (leave only 
the `endMapIndex <= statuses.length` here).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to