Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r39988247
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
       }
     
       /**
    +   * Return the preferred hosts on which to run the given map output 
partition in a given shuffle,
    +   * i.e. the nodes that the most outputs for that partition are on.
    +   *
    +   * @param dep shuffle dependency object
    +   * @param partitionId map output partition that we want to read
    +   * @return a sequence of host names
    +   */
    +  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], 
partitionId: Int)
    +      : Seq[String] = {
    +    if (shuffleLocalityEnabled && dep.rdd.partitions.length < 
SHUFFLE_PREF_MAP_THRESHOLD &&
    +        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
    +      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, 
partitionId,
    +        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
    +      if (blockManagerIds.nonEmpty) {
    +        blockManagerIds.get.map(_.host)
    --- End diff --
    
    The previous implementation can use the executor id to schedule tasks. It 
could be more effective when there are multiple executors in the same host. Is 
it possible to support it in the new implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to