Github user shivaram commented on a diff in the pull request:
https://github.com/apache/spark/pull/8844#discussion_r40043438
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf:
SparkConf)
}
/**
+ * Return the preferred hosts on which to run the given map output
partition in a given shuffle,
+ * i.e. the nodes that the most outputs for that partition are on.
+ *
+ * @param dep shuffle dependency object
+ * @param partitionId map output partition that we want to read
+ * @return a sequence of host names
+ */
+ def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _],
partitionId: Int)
+ : Seq[String] = {
+ if (shuffleLocalityEnabled && dep.rdd.partitions.length <
SHUFFLE_PREF_MAP_THRESHOLD &&
+ dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+ val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId,
partitionId,
+ dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
+ if (blockManagerIds.nonEmpty) {
+ blockManagerIds.get.map(_.host)
--- End diff --
I don't think its very important given that our shuffle implementations
write out data to local disk -- It might make a difference if we stored
shuffle data in the block manager (deserialized) but given the status quo I
think executor-level locality doesn't really matter.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]