Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15270358 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +188,88 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + def unrollSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) + : Either[Array[Any], Iterator[Any]] = { + + var count = 0 // The number of elements unrolled so far + var atMemoryLimit = false // Whether we are at the memory limit for unrolling blocks + var previousSize = 0L // Previous estimate of the size of our buffer + val memoryRequestPeriod = 1000 // How frequently we request more memory for our buffer + val memoryRequestThreshold = 100 // Before this is exceeded, request memory every round + + val threadId = Thread.currentThread().getId + val unrollMemoryMap = SparkEnv.get.unrollMemoryMap + var buffer = new SizeTrackingVector[Any] + + try { + while (values.hasNext && !atMemoryLimit) { + buffer += values.next() + count += 1 + if (count % memoryRequestPeriod == 1 || count < memoryRequestThreshold) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = if (previousSize > 0) math.max(currentSize - previousSize, 0) else 0 + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + unrollMemoryMap.synchronized { + val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory + + // Request memory for the local buffer and return whether request is granted + def requestMemory(): Boolean = { + val availableMemory = freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { unrollMemoryMap(threadId) = memoryToRequest } + granted + } + + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + if (!requestMemory()) { + val result = ensureFreeSpace(blockId, globalBufferMemory) + droppedBlocks ++= result.droppedBlocks + atMemoryLimit = !requestMemory() + } + } + } + } + + if (!atMemoryLimit) { + // We successfully unrolled the entirety of this block + Left(buffer.toArray) + } else { + // We ran out of space while unrolling the values for this block + Right(buffer.iterator ++ values) + } + + } finally { + // Unless we return an iterator that depends on the buffer, free up space for other threads + if (!atMemoryLimit) { + buffer = null + unrollMemoryMap.synchronized { + unrollMemoryMap(threadId) = 0 + } + } --- End diff -- It's kind of weird that the memory is given back to the pool here rather than when the task ends. Should we instead return an UnrollResult class that has a freeBuffer method that we call in finally blocks everywhere (or task cleanup)? Or is there some provision to account for this? Basically right now we're accounting for almost everything except the block store's memory to be usable for shuffles, so it seems this can still exceed our total memory.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---