Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/6652#discussion_r31766904
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -284,6 +291,32 @@ private[spark] class MapOutputTrackerMaster(conf:
SparkConf)
cachedSerializedStatuses.contains(shuffleId) ||
mapStatuses.contains(shuffleId)
}
+ // Return the list of locations and blockSizes for each reducer.
+ // The map is keyed by reducerId and for each reducer the value contains
the array
+ // of (location, size) of map outputs.
+ //
+ // This method is not thread-safe
+ def getStatusByReducer(
+ shuffleId: Int,
+ numReducers: Int)
+ : Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
+ if (!statusByReducer.contains(shuffleId) &&
mapStatuses.contains(shuffleId)) {
+ val statuses = mapStatuses(shuffleId)
+ if (statuses.length > 0) {
+ statusByReducer(shuffleId) = new HashMap[Int,
Array[(BlockManagerId, Long)]]
--- End diff --
Also is it possible to combine entries for the same block manager here? I
think that, right now, if multiple maps are at the same block manager, they
won't be aggregated when thinking about which block manager ID has the most
output, so if you have
BM 1: 2 bytes
BM 1: 2 bytes
BM 2: 3 bytes,
right now BM2 will be considered the biggest.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]