Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/126#discussion_r10711505
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -181,15 +178,50 @@ private[spark] class MapOutputTracker(conf: 
SparkConf) extends Logging {
       }
     }
     
    +/**
    + * MapOutputTracker for the workers. This uses BoundedHashMap to keep 
track of
    + * a limited number of most recently used map output information.
    + */
    +private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends 
MapOutputTracker(conf) {
    +
    +  /**
    +   * Bounded HashMap for storing serialized statuses in the worker. This 
allows
    +   * the HashMap stay bounded in memory-usage. Things dropped from this 
HashMap will be
    +   * automatically repopulated by fetching them again from the driver. Its 
okay to
    +   * keep the cache size small as it unlikely that there will be a very 
large number of
    +   * stages active simultaneously in the worker.
    +   */
    +  protected val mapStatuses = new BoundedHashMap[Int, Array[MapStatus]](
    +    conf.getInt("spark.mapOutputTracker.cacheSize", 100), true
    +  )
    +}
    +
    +/**
    + * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep 
track of map
    + * output information, which allows old output information based on a TTL.
    + */
     private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       extends MapOutputTracker(conf) {
     
       // Cache a serialized version of the output statuses for each shuffle to 
send them out faster
       private var cacheEpoch = epoch
    -  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, 
Array[Byte]]
    +
    +  /**
    +   * Timestamp based HashMap for storing mapStatuses and cached serialized 
statuses
    +   * in the master, so that statuses are dropped only by explicit 
deregistering or
    +   * by TTL-based cleaning (if set). Other than these two
    +   * scenarios, nothing should be dropped from this HashMap.
    +   */
    +
    +  protected val mapStatuses = new TimeStampedHashMap[Int, 
Array[MapStatus]]()
    +  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, 
Array[Byte]]()
    +
    +  // For cleaning up TimeStampedHashMaps
    +  private val metadataCleaner =
    +    new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, 
this.cleanup, conf)
     
       def registerShuffle(shuffleId: Int, numMaps: Int) {
    -    if (mapStatuses.putIfAbsent(shuffleId, new 
Array[MapStatus](numMaps)).isDefined) {
    +    if (mapStatuses.put(shuffleId, new 
Array[MapStatus](numMaps)).isDefined) {
    --- End diff --
    
    I changed it to make sure that we are using map operations that are exposed 
by a Scala map, so that it is easier to replace TimeStampedHashMap with a Scala 
map later (when we have a better sense of how to clean up). In this case, since 
we are throwing an error if previous values exists, the behavior of put and 
putifAbsent is similar. Though, I realize now, that putIfAbsent is atomic, 
where as scala's Map.put it is not obvious whether it is atomic. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to