Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1607#discussion_r15437121
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
    @@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C](
     
       /**
        * Insert the given key and value into the map.
    +   */
    +  def insert(key: K, value: V) {
    +    insertAll(Iterator((key, value)))
    +  }
    +
    +  /**
    +   * Insert the given iterator of keys and values into the map.
        *
    -   * If the underlying map is about to grow, check if the global pool of 
shuffle memory has
    +   * When the underlying map needs to grow, check if the global pool of 
shuffle memory has
        * enough room for this to happen. If so, allocate the memory required 
to grow the map;
        * otherwise, spill the in-memory map to disk.
        *
        * The shuffle memory usage of the first trackMemoryThreshold entries is 
not tracked.
        */
    -  def insert(key: K, value: V) {
    +  def insertAll(entries: Iterator[Product2[K, V]]) {
    +    // An update function for the map that we reuse across entries to 
avoid allocating
    +    // a new closure each time
    +    var curEntry: Product2[K, V] = null
         val update: (Boolean, C) => C = (hadVal, oldVal) => {
    -      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
    +      if (hadVal) mergeValue(oldVal, curEntry._2) else 
createCombiner(curEntry._2)
         }
    -    if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
    -      val mapSize = currentMap.estimateSize()
    -      var shouldSpill = false
    -      val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
    -
    -      // Atomically check whether there is sufficient memory in the global 
pool for
    -      // this map to grow and, if possible, allocate the required amount
    -      shuffleMemoryMap.synchronized {
    -        val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
    -        val availableMemory = maxMemoryThreshold -
    -          (shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
    -
    -        // Assume map growth factor is 2x
    -        shouldSpill = availableMemory < mapSize * 2
    -        if (!shouldSpill) {
    -          shuffleMemoryMap(threadId) = mapSize * 2
    +
    +    while (entries.hasNext) {
    +      curEntry = entries.next()
    +      if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
    +        val mapSize = currentMap.estimateSize()
    +        var shouldSpill = false
    +        val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
    +
    +        // Atomically check whether there is sufficient memory in the 
global pool for
    +        // this map to grow and, if possible, allocate the required amount
    +        shuffleMemoryMap.synchronized {
    +          val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
    +          val availableMemory = maxMemoryThreshold -
    +            (shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
    +
    +          // Assume map growth factor is 2x
    +          shouldSpill = availableMemory < mapSize * 2
    +          if (!shouldSpill) {
    +            shuffleMemoryMap(threadId) = mapSize * 2
    +          }
    +        }
    +        // Do not synchronize spills
    +        if (shouldSpill) {
    +          spill(mapSize)
             }
           }
    -      // Do not synchronize spills
    -      if (shouldSpill) {
    -        spill(mapSize)
    -      }
    +      currentMap.changeValue(curEntry._1, update)
    +      numPairsInMemory += 1
         }
    -    currentMap.changeValue(key, update)
    -    numPairsInMemory += 1
       }
    --- End diff --
    
    Does it makes sense to include a more collection-oriented interface, more 
like scala.collection.mutable.Buffer#insertAll:
    ```scala
    def insertAll(coll: Iterable[Product2[K, V]]): Unit = 
insertAll(coll.iterator)
    ```
    ...so that you can do things like `externalAppendOnlyMap.insertAll(aMap)` 
instead of `externalAppendOnlyMap.insertAll(aMap.iterator)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to