Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/1165#discussion_r14808850
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager,
maxMemory: Long)
}
/**
+ * Unfold the given block in memory safely.
+ *
+ * The safety of this operation refers to avoiding potential OOM
exceptions caused by
+ * unfolding the entirety of the block in memory at once. This is
achieved by periodically
+ * checking whether the memory restrictions for unfolding blocks are
still satisfied,
+ * stopping immediately if not. This check is a safeguard against the
scenario in which
+ * there is not enough free memory to accommodate the entirety of a
single block.
+ *
+ * This method returns either a fully unfolded array or a partially
unfolded iterator.
+ */
+ def unfoldSafely(
+ blockId: BlockId,
+ values: Iterator[Any],
+ droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+ : Either[Array[Any], Iterator[Any]] = {
+
+ var count = 0 // The number of elements unfolded so
far
+ var enoughMemory = true // Whether there is enough memory to
unfold this block
+ var previousSize = 0L // Previous estimate of the size of
our buffer
+ val memoryRequestPeriod = 1000 // How frequently we request for more
memory for our buffer
+
+ val threadId = Thread.currentThread().getId
+ val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
+ var buffer = new SizeTrackingAppendOnlyBuffer[Any]
+
+ try {
+ while (values.hasNext && enoughMemory) {
+ buffer += values.next()
+ count += 1
+ if (count % memoryRequestPeriod == 1) {
+ // Calculate the amount of memory to request from the global
memory pool
+ val currentSize = buffer.estimateSize()
+ val delta = if (previousSize > 0) math.max(currentSize -
previousSize, 0) else 0
+ val memoryToRequest = currentSize + delta
+ previousSize = currentSize
+
+ // Atomically check whether there is sufficient memory in the
global pool to continue
+ cacheMemoryMap.synchronized {
+ val previouslyOccupiedMemory =
cacheMemoryMap.get(threadId).getOrElse(0L)
+ val otherThreadsMemory = cacheMemoryMap.values.sum -
previouslyOccupiedMemory
+
+ // Request for memory for the local buffer, and return whether
request is granted
--- End diff --
Thing it's proper not to use a comma here
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---