Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56915182
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
    +   * whether there is enough free memory. If the block is successfully 
materialized, then the
    +   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the 
block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
    +   *         failure, return a handle which allows the caller to either 
finish the serialization
    +   *         by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully 
consume this result
    +   *         iterator or call `discard()` on it in order to free the 
storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling 
this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / 
putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, 
redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    --- End diff --
    
    One important implicit assumption which I will make explicit in a line 
comment: we assume that we'll always be able to get enough memory to unroll at 
least one element in between size calculation. This is the same assumption that 
we have in the deserialized case, since we only periodically measure memory 
usage there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to