Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14324897 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -142,10 +151,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + + var count = 0 // The number of elements unrolled so far + var dropPartition = false // Whether to drop the new partition from memory + var previousSize = 0L // Previous estimate of the size of our buffer + val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + + val threadId = Thread.currentThread().getId + val cacheMemoryMap = SparkEnv.get.cacheMemoryMap + var buffer = new SizeTrackingAppendOnlyBuffer[Any] + + /* While adding values to the in-memory buffer, periodically check whether the memory + * restrictions for unrolling partitions are still satisfied. If not, stop immediately, + * and persist the partition to disk if specified by the storage level. This check is + * a safeguard against the scenario when a single partition does not fit in memory. */ + while (values.hasNext && !dropPartition) { + buffer += values.next() + count += 1 + if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = math.max(currentSize - previousSize, 0) + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { + val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + + // Request for memory for the local buffer, and return whether request is granted + def requestForMemory(): Boolean = { + val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { cacheMemoryMap(threadId) = memoryToRequest } + granted + } + + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + if (!requestForMemory()) { + val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory) + updatedBlocks ++= result.droppedBlocks + dropPartition = !requestForMemory() + } + } + } + } + + if (!dropPartition) { + // We have successfully unrolled the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true) + buffer.iterator.asInstanceOf[Iterator[T]] + } else { + // We have exceeded our collective quota. This partition will not be cached in memory. + val persistToDisk = storageLevel.useDisk + logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " + + s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else "")) + var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]] + if (persistToDisk) { + val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false, + storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication) --- End diff -- since useMemory = false, might make sense to set deserialized to false.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---