Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1165#discussion_r14324897
  
    --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
    @@ -142,10 +151,76 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
            * to the BlockManager as an iterator and expect to read it back 
later. This is because
            * we may end up dropping a partition from memory store before 
getting it back, e.g.
            * when the entirety of the RDD does not fit in memory. */
    -      val elements = new ArrayBuffer[Any]
    -      elements ++= values
    -      updatedBlocks ++= blockManager.put(key, elements, storageLevel, 
tellMaster = true)
    -      elements.iterator.asInstanceOf[Iterator[T]]
    +
    +      var count = 0                   // The number of elements unrolled 
so far
    +      var dropPartition = false       // Whether to drop the new partition 
from memory
    +      var previousSize = 0L           // Previous estimate of the size of 
our buffer
    +      val memoryRequestPeriod = 1000  // How frequently we request for 
more memory for our buffer
    +
    +      val threadId = Thread.currentThread().getId
    +      val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
    +      var buffer = new SizeTrackingAppendOnlyBuffer[Any]
    +
    +      /* While adding values to the in-memory buffer, periodically check 
whether the memory
    +       * restrictions for unrolling partitions are still satisfied. If 
not, stop immediately,
    +       * and persist the partition to disk if specified by the storage 
level. This check is
    +       * a safeguard against the scenario when a single partition does not 
fit in memory. */
    +      while (values.hasNext && !dropPartition) {
    +        buffer += values.next()
    +        count += 1
    +        if (count % memoryRequestPeriod == 1) {
    +          // Calculate the amount of memory to request from the global 
memory pool
    +          val currentSize = buffer.estimateSize()
    +          val delta = math.max(currentSize - previousSize, 0)
    +          val memoryToRequest = currentSize + delta
    +          previousSize = currentSize
    +
    +          // Atomically check whether there is sufficient memory in the 
global pool to continue
    +          cacheMemoryMap.synchronized {
    +            val previouslyOccupiedMemory = 
cacheMemoryMap.get(threadId).getOrElse(0L)
    +            val otherThreadsMemory = cacheMemoryMap.values.sum - 
previouslyOccupiedMemory
    +
    +            // Request for memory for the local buffer, and return whether 
request is granted
    +            def requestForMemory(): Boolean = {
    +              val availableMemory = blockManager.memoryStore.freeMemory - 
otherThreadsMemory
    +              val granted = availableMemory > memoryToRequest
    +              if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
    +              granted
    +            }
    +
    +            // If the first request is not granted, try again after 
ensuring free space
    +            // If there is still not enough space, give up and drop the 
partition
    +            if (!requestForMemory()) {
    +              val result = blockManager.memoryStore.ensureFreeSpace(key, 
globalBufferMemory)
    +              updatedBlocks ++= result.droppedBlocks
    +              dropPartition = !requestForMemory()
    +            }
    +          }
    +        }
    +      }
    +
    +      if (!dropPartition) {
    +        // We have successfully unrolled the entire partition, so cache it 
in memory
    +        updatedBlocks ++= blockManager.put(key, buffer.array, 
storageLevel, tellMaster = true)
    +        buffer.iterator.asInstanceOf[Iterator[T]]
    +      } else {
    +        // We have exceeded our collective quota. This partition will not 
be cached in memory.
    +        val persistToDisk = storageLevel.useDisk
    +        logWarning(s"Failed to cache $key in memory! There is not enough 
space to unroll the " +
    +          s"entire partition. " + (if (persistToDisk) "Persisting to disk 
instead." else ""))
    +        var newValues = (buffer.iterator ++ 
values).asInstanceOf[Iterator[T]]
    +        if (persistToDisk) {
    +          val newLevel = StorageLevel(storageLevel.useDisk, useMemory = 
false,
    +            storageLevel.useOffHeap, storageLevel.deserialized, 
storageLevel.replication)
    --- End diff --
    
    since useMemory = false, might make sense to set deserialized to false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to