Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1165#discussion_r15270358
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -141,6 +188,88 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
       }
     
       /**
    +   * Unroll the given block in memory safely.
    +   *
    +   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
    +   * unrolling the entirety of the block in memory at once. This is 
achieved by periodically
    +   * checking whether the memory restrictions for unrolling blocks are 
still satisfied,
    +   * stopping immediately if not. This check is a safeguard against the 
scenario in which
    +   * there is not enough free memory to accommodate the entirety of a 
single block.
    +   *
    +   * This method returns either an array with the contents of the entire 
block or an iterator
    +   * containing the values of the block (if the array would have exceeded 
available memory).
    +   */
    +  def unrollSafely(
    +      blockId: BlockId,
    +      values: Iterator[Any],
    +      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
    +    : Either[Array[Any], Iterator[Any]] = {
    +
    +    var count = 0                     // The number of elements unrolled 
so far
    +    var atMemoryLimit = false         // Whether we are at the memory 
limit for unrolling blocks
    +    var previousSize = 0L             // Previous estimate of the size of 
our buffer
    +    val memoryRequestPeriod = 1000    // How frequently we request more 
memory for our buffer
    +    val memoryRequestThreshold = 100  // Before this is exceeded, request 
memory every round
    +
    +    val threadId = Thread.currentThread().getId
    +    val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
    +    var buffer = new SizeTrackingVector[Any]
    +
    +    try {
    +      while (values.hasNext && !atMemoryLimit) {
    +        buffer += values.next()
    +        count += 1
    +        if (count % memoryRequestPeriod == 1 || count < 
memoryRequestThreshold) {
    +          // Calculate the amount of memory to request from the global 
memory pool
    +          val currentSize = buffer.estimateSize()
    +          val delta = if (previousSize > 0) math.max(currentSize - 
previousSize, 0) else 0
    +          val memoryToRequest = currentSize + delta
    +          previousSize = currentSize
    +
    +          // Atomically check whether there is sufficient memory in the 
global pool to continue
    +          unrollMemoryMap.synchronized {
    +            val previouslyOccupiedMemory = 
unrollMemoryMap.get(threadId).getOrElse(0L)
    +            val otherThreadsMemory = unrollMemoryMap.values.sum - 
previouslyOccupiedMemory
    +
    +            // Request memory for the local buffer and return whether 
request is granted
    +            def requestMemory(): Boolean = {
    +              val availableMemory = freeMemory - otherThreadsMemory
    +              val granted = availableMemory > memoryToRequest
    +              if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
    +              granted
    +            }
    +
    +            // If the first request is not granted, try again after 
ensuring free space
    +            // If there is still not enough space, give up and drop the 
partition
    +            if (!requestMemory()) {
    +              val result = ensureFreeSpace(blockId, globalBufferMemory)
    +              droppedBlocks ++= result.droppedBlocks
    +              atMemoryLimit = !requestMemory()
    +            }
    +          }
    +        }
    +      }
    +
    +      if (!atMemoryLimit) {
    +        // We successfully unrolled the entirety of this block
    +        Left(buffer.toArray)
    +      } else {
    +        // We ran out of space while unrolling the values for this block
    +        Right(buffer.iterator ++ values)
    +      }
    +
    +    } finally {
    +      // Unless we return an iterator that depends on the buffer, free up 
space for other threads
    +      if (!atMemoryLimit) {
    +        buffer = null
    +        unrollMemoryMap.synchronized {
    +          unrollMemoryMap(threadId) = 0
    +        }
    +      }
    --- End diff --
    
    It's kind of weird that the memory is given back to the pool here rather 
than when the task ends. Should we instead return an UnrollResult class that 
has a freeBuffer method that we call in finally blocks everywhere (or task 
cleanup)? Or is there some provision to account for this? Basically right now 
we're accounting for almost everything except the block store's memory to be 
usable for shuffles, so it seems this can still exceed our total memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to