mridulm commented on a change in pull request #35719:
URL: https://github.com/apache/spark/pull/35719#discussion_r818967642
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1022,101 +1022,105 @@ private[spark] class MapOutputTrackerMaster(
if (preferredLoc.nonEmpty) {
preferredLoc
} else {
- if (shuffleLocalityEnabled && dep.rdd.partitions.length <
SHUFFLE_PREF_MAP_THRESHOLD &&
- dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
- val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId,
partitionId,
- dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
- if (blockManagerIds.nonEmpty) {
- blockManagerIds.get.map(_.host)
- } else {
- Nil
- }
- } else {
- Nil
- }
+ getLocationsWithLargestOutputs(
+ dep,
+ 0,
+ dep.rdd.getNumPartitions,
+ partitionId,
+ partitionId + 1,
+ REDUCER_PREF_LOCS_FRACTION).map(_.host)
}
} else {
Nil
}
}
/**
- * Return a list of locations that each have fraction of map output greater
than the specified
- * threshold.
+ * Return the preferred hosts on which to run the given map output partition
in a given shuffle,
+ * i.e. the nodes that the most outputs for that partition are on.
*
- * @param shuffleId id of the shuffle
- * @param reducerId id of the reduce task
- * @param numReducers total number of reducers in the shuffle
- * @param fractionThreshold fraction of total map output size that a
location must have
- * for it to be considered large.
+ * @param dep shuffle dependency object
+ * @param startMapIndex the start map index (inclusive)
+ * @param endMapIndex the end map index (exclusive)
+ * @param startReducerIndex the start reducer index (inclusive)
+ * @param endReducerIndex the end reducer index (exclusive)
+ * @return a sequence of locations where task runs.
*/
- def getLocationsWithLargestOutputs(
- shuffleId: Int,
- reducerId: Int,
- numReducers: Int,
- fractionThreshold: Double)
- : Option[Array[BlockManagerId]] = {
-
- val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
- if (shuffleStatus != null) {
- shuffleStatus.withMapStatuses { statuses =>
- if (statuses.nonEmpty) {
- // HashMap to add up sizes of all blocks at the same location
- val locs = new HashMap[BlockManagerId, Long]
- var totalOutputSize = 0L
- var mapIdx = 0
- while (mapIdx < statuses.length) {
- val status = statuses(mapIdx)
- // status may be null here if we are called between
registerShuffle, which creates an
- // array with null entries for each output, and
registerMapOutputs, which populates it
- // with valid status entries. This is possible if one thread
schedules a job which
- // depends on an RDD which is currently being computed by another
thread.
- if (status != null) {
- val blockSize = status.getSizeForBlock(reducerId)
- if (blockSize > 0) {
- locs(status.location) = locs.getOrElse(status.location, 0L) +
blockSize
- totalOutputSize += blockSize
- }
- }
- mapIdx = mapIdx + 1
- }
- val topLocs = locs.filter { case (loc, size) =>
- size.toDouble / totalOutputSize >= fractionThreshold
- }
- // Return if we have any locations which satisfy the required
threshold
- if (topLocs.nonEmpty) {
- return Some(topLocs.keys.toArray)
- }
- }
- }
- }
- None
+ def getPreferredLocationsForShuffle(
+ dep: ShuffleDependency[_, _, _],
+ startMapIndex: Int,
+ endMapIndex: Int,
+ startReducerIndex: Int,
+ endReducerIndex: Int): Seq[String] = {
+ getLocationsWithLargestOutputs(
+ dep,
+ startMapIndex,
+ endMapIndex,
+ startReducerIndex,
+ endReducerIndex,
+ REDUCER_PREF_LOCS_FRACTION).map(_.host)
}
/**
- * Return the locations where the Mappers ran. The locations each includes
both a host and an
- * executor id on that host.
+ * If shuffle locality is enabled, return a list of locations that each have
fraction of
+ * map output greater than the specified threshold.
*
* @param dep shuffle dependency object
- * @param startMapIndex the start map index
+ * @param startMapIndex the start map index (inclusive)
* @param endMapIndex the end map index (exclusive)
- * @return a sequence of locations where task runs.
+ * @param startReducerIndex the start reducer index (inclusive)
+ * @param endReducerIndex the end reducer index (exclusive)
+ * @param fractionThreshold fraction of total map output size that a
location must have
+ * for it to be considered large.
*/
- def getMapLocation(
+ def getLocationsWithLargestOutputs(
dep: ShuffleDependency[_, _, _],
startMapIndex: Int,
- endMapIndex: Int): Seq[String] =
- {
- val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
- if (shuffleStatus != null) {
- shuffleStatus.withMapStatuses { statuses =>
- if (startMapIndex < endMapIndex &&
- (startMapIndex >= 0 && endMapIndex <= statuses.length)) {
- val statusesPicked = statuses.slice(startMapIndex,
endMapIndex).filter(_ != null)
- statusesPicked.map(_.location.host).toSeq
- } else {
- Nil
+ endMapIndex: Int,
+ startReducerIndex: Int,
+ endReducerIndex: Int,
+ fractionThreshold: Double): Seq[BlockManagerId] = {
+ if (shuffleLocalityEnabled && dep.rdd.getNumPartitions <
SHUFFLE_PREF_MAP_THRESHOLD &&
+ dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
+ val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
+ if (shuffleStatus != null) {
+ shuffleStatus.withMapStatuses { statuses =>
+ if (statuses.nonEmpty && startMapIndex < endMapIndex &&
+ startMapIndex >= 0 && endMapIndex <= statuses.length) {
+ // HashMap to add up sizes of all blocks at the same location
+ val locs = new HashMap[BlockManagerId, Long]
+ var totalOutputSize = 0L
+ var mapIdx = startMapIndex
Review comment:
Hmm, we need to enforce `fractionThreshold` as well ... ok, this might
not work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]