Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1607#discussion_r15437397
--- Diff:
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
---
@@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C](
/**
* Insert the given key and value into the map.
+ */
+ def insert(key: K, value: V) {
+ insertAll(Iterator((key, value)))
+ }
+
+ /**
+ * Insert the given iterator of keys and values into the map.
*
- * If the underlying map is about to grow, check if the global pool of
shuffle memory has
+ * When the underlying map needs to grow, check if the global pool of
shuffle memory has
* enough room for this to happen. If so, allocate the memory required
to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is
not tracked.
*/
- def insert(key: K, value: V) {
+ def insertAll(entries: Iterator[Product2[K, V]]) {
+ // An update function for the map that we reuse across entries to
avoid allocating
+ // a new closure each time
+ var curEntry: Product2[K, V] = null
val update: (Boolean, C) => C = (hadVal, oldVal) => {
- if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+ if (hadVal) mergeValue(oldVal, curEntry._2) else
createCombiner(curEntry._2)
}
- if (numPairsInMemory > trackMemoryThreshold &&
currentMap.atGrowThreshold) {
- val mapSize = currentMap.estimateSize()
- var shouldSpill = false
- val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
- // Atomically check whether there is sufficient memory in the global
pool for
- // this map to grow and, if possible, allocate the required amount
- shuffleMemoryMap.synchronized {
- val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
- val availableMemory = maxMemoryThreshold -
- (shuffleMemoryMap.values.sum -
previouslyOccupiedMemory.getOrElse(0L))
-
- // Assume map growth factor is 2x
- shouldSpill = availableMemory < mapSize * 2
- if (!shouldSpill) {
- shuffleMemoryMap(threadId) = mapSize * 2
+
+ while (entries.hasNext) {
+ curEntry = entries.next()
+ if (numPairsInMemory > trackMemoryThreshold &&
currentMap.atGrowThreshold) {
+ val mapSize = currentMap.estimateSize()
+ var shouldSpill = false
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+ // Atomically check whether there is sufficient memory in the
global pool for
+ // this map to grow and, if possible, allocate the required amount
+ shuffleMemoryMap.synchronized {
+ val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+ val availableMemory = maxMemoryThreshold -
+ (shuffleMemoryMap.values.sum -
previouslyOccupiedMemory.getOrElse(0L))
+
+ // Assume map growth factor is 2x
+ shouldSpill = availableMemory < mapSize * 2
+ if (!shouldSpill) {
+ shuffleMemoryMap(threadId) = mapSize * 2
+ }
+ }
+ // Do not synchronize spills
+ if (shouldSpill) {
+ spill(mapSize)
}
}
- // Do not synchronize spills
- if (shouldSpill) {
- spill(mapSize)
- }
+ currentMap.changeValue(curEntry._1, update)
+ numPairsInMemory += 1
}
- currentMap.changeValue(key, update)
- numPairsInMemory += 1
}
--- End diff --
The problem is that much of the time we compute data in an Iterator, and
Iterator is not Iterable AFAIK (you can't iterate over it more than once). We
could add a second interface though if you'd like that.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---