Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r616236935



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -633,23 +887,50 @@ private[spark] class MapOutputTrackerMaster(
 
   /**
    * Return the preferred hosts on which to run the given map output partition 
in a given shuffle,
-   * i.e. the nodes that the most outputs for that partition are on.
+   * i.e. the nodes that the most outputs for that partition are on. If the 
map output is
+   * pre-merged, then return the node where the merged block is located if the 
merge ratio is
+   * above the threshold.
    *
    * @param dep shuffle dependency object
    * @param partitionId map output partition that we want to read
    * @return a sequence of host names
    */
   def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], 
partitionId: Int)
       : Seq[String] = {
-    if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
-        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
-      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
-        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
-      if (blockManagerIds.nonEmpty) {
-        blockManagerIds.get.map(_.host)
+    val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
+    if (shuffleStatus != null) {
+      // Check if the map output is pre-merged and if the merge ratio is above 
the threshold.
+      // If so, the location of the merged block is the preferred location.
+      val preferredLoc = if (pushBasedShuffleEnabled) {

Review comment:
       I agree that we should make it consistent, but there's also clear 
difference between locality calculation for push-based shuffle and the original 
shuffle.
   My understanding of the reason for adding this flag is due to the 
potentially costly computation for shuffle locality in the original shuffle.
   For push based shuffle, that cost is no longer a concern, and the reducer 
task can achieve much much better locality.
   Always calculating shuffle locality is preferred.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to