Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r598363936



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -633,23 +810,50 @@ private[spark] class MapOutputTrackerMaster(
 
   /**
    * Return the preferred hosts on which to run the given map output partition 
in a given shuffle,
-   * i.e. the nodes that the most outputs for that partition are on.
+   * i.e. the nodes that the most outputs for that partition are on. If the 
map output is
+   * pre-merged, then return the node where the merged block is located if the 
merge ratio is
+   * above the threshold.
    *
    * @param dep shuffle dependency object
    * @param partitionId map output partition that we want to read
    * @return a sequence of host names
    */
   def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], 
partitionId: Int)
       : Seq[String] = {
-    if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
-        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
-      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
-        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
-      if (blockManagerIds.nonEmpty) {
-        blockManagerIds.get.map(_.host)
+    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
+    if (shuffleStatus != null) {
+      // Check if the map output is pre-merged and if the merge ratio is above 
the threshold.
+      // If so, the location of the merged block is the preferred location.
+      val preferredLoc = if (pushBasedShuffleEnabled) {
+        shuffleStatus.withMergeStatuses { statuses =>
+          val status = statuses(partitionId)
+          val numMaps = dep.rdd.partitions.length
+          if (status != null && status.getMissingMaps(numMaps).length.toDouble 
/ numMaps
+            <= (1 - REDUCER_PREF_LOCS_FRACTION)) {
+            Seq(status.location.host)
+          } else {
+            Nil
+          }
+        }
       } else {
         Nil
       }
+      if (!preferredLoc.isEmpty) {
+        preferredLoc
+      } else {
+        if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
+          dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+          val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
+            dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
+          if (blockManagerIds.nonEmpty) {
+            blockManagerIds.get.map(_.host)
+          } else {
+            Nil
+          }
+        } else {
+          Nil
+        }
+      }

Review comment:
       This is true.
   The consideration is to optimize both the local fetched block size and the 
reduction of shuffle fetch RPCs.
   Although only calculating based on number of blocks instead of the size of 
the blocks would lead to slight inaccuracy, it should be ignorable since the 
case where most of the mappers for the skewed partition run on the same host 
should very unlikely.
   The current approach would lead to satisfactory enough calculation while 
preserving the simplicity in the implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to