Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1165#discussion_r14808850
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
       }
     
       /**
    +   * Unfold the given block in memory safely.
    +   *
    +   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
    +   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
    +   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
    +   * stopping immediately if not. This check is a safeguard against the 
scenario in which
    +   * there is not enough free memory to accommodate the entirety of a 
single block.
    +   *
    +   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
    +   */
    +  def unfoldSafely(
    +      blockId: BlockId,
    +      values: Iterator[Any],
    +      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
    +    : Either[Array[Any], Iterator[Any]] = {
    +
    +    var count = 0                   // The number of elements unfolded so 
far
    +    var enoughMemory = true         // Whether there is enough memory to 
unfold this block
    +    var previousSize = 0L           // Previous estimate of the size of 
our buffer
    +    val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
    +
    +    val threadId = Thread.currentThread().getId
    +    val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
    +    var buffer = new SizeTrackingAppendOnlyBuffer[Any]
    +
    +    try {
    +      while (values.hasNext && enoughMemory) {
    +        buffer += values.next()
    +        count += 1
    +        if (count % memoryRequestPeriod == 1) {
    +          // Calculate the amount of memory to request from the global 
memory pool
    +          val currentSize = buffer.estimateSize()
    +          val delta = if (previousSize > 0) math.max(currentSize - 
previousSize, 0) else 0
    +          val memoryToRequest = currentSize + delta
    +          previousSize = currentSize
    +
    +          // Atomically check whether there is sufficient memory in the 
global pool to continue
    +          cacheMemoryMap.synchronized {
    +            val previouslyOccupiedMemory = 
cacheMemoryMap.get(threadId).getOrElse(0L)
    +            val otherThreadsMemory = cacheMemoryMap.values.sum - 
previouslyOccupiedMemory
    +
    +            // Request for memory for the local buffer, and return whether 
request is granted
    --- End diff --
    
    Thing it's proper not to use a comma here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to