Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6652#discussion_r31766904
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -284,6 +291,32 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
         cachedSerializedStatuses.contains(shuffleId) || 
mapStatuses.contains(shuffleId)
       }
     
    +  // Return the list of locations and blockSizes for each reducer.
    +  // The map is keyed by reducerId and for each reducer the value contains 
the array
    +  // of (location, size) of map outputs.
    +  //
    +  // This method is not thread-safe
    +  def getStatusByReducer(
    +      shuffleId: Int,
    +      numReducers: Int)
    +    : Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
    +    if (!statusByReducer.contains(shuffleId) && 
mapStatuses.contains(shuffleId)) {
    +      val statuses = mapStatuses(shuffleId)
    +      if (statuses.length > 0) {
    +        statusByReducer(shuffleId) = new HashMap[Int, 
Array[(BlockManagerId, Long)]]
    --- End diff --
    
    Also is it possible to combine entries for the same block manager here? I 
think that, right now, if multiple maps are at the same block manager, they 
won't be aggregated when thinking about which block manager ID has the most 
output, so if you have
    
    BM 1: 2 bytes
    BM 1: 2 bytes
    BM 2: 3 bytes,
    
    right now BM2 will be considered the biggest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to