Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40026880
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
       }
     
       /**
    +   * Return the preferred hosts on which to run the given map output 
partition in a given shuffle,
    +   * i.e. the nodes that the most outputs for that partition are on.
    +   *
    +   * @param dep shuffle dependency object
    +   * @param partitionId map output partition that we want to read
    +   * @return a sequence of host names
    +   */
    +  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], 
partitionId: Int)
    +      : Seq[String] = {
    +    if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
    +        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
    +      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
    +        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
    +      if (blockManagerIds.nonEmpty) {
    +        blockManagerIds.get.map(_.host)
    --- End diff --
    
    Yeah, that's a good point, we lose that because of the design of 
RDD.getPreferredLocations. I'd actually consider extending that API to allow 
passing executor IDs, but probably in a separate JIRA. @shivaram how important 
do you think this is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to