Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/11791#discussion_r57401730
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -279,13 +275,117 @@ private[spark] class MemoryStore(
}
}
+ /**
+ * Attempt to put the given block in memory store as bytes.
+ *
+ * It's possible that the iterator is too large to materialize and store
in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while
periodically checking
+ * whether there is enough free memory. If the block is successfully
materialized, then the
+ * temporary unroll memory used during the materialization is
"transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the
block.
+ *
+ * @return in case of success, the estimated the estimated size of the
stored data. In case of
+ * failure, return a handle which allows the caller to either
finish the serialization
+ * by spilling to disk or to deserialize the
partially-serialized block and reconstruct
+ * the original input iterator. The caller must either fully
consume this result
+ * iterator or call `discard()` on it in order to free the
storage memory consumed by the
+ * partially-unrolled block.
+ */
+ private[storage] def putIteratorAsBytes[T](
+ blockId: BlockId,
+ values: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+
+ require(!contains(blockId), s"Block $blockId is already present in the
MemoryStore")
+
+ // Whether there is still enough memory for us to continue unrolling
this block
+ var keepUnrolling = true
+ // Initial per-task memory to request for unrolling blocks (bytes).
+ val initialMemoryThreshold = unrollMemoryThreshold
+ // Keep track of unroll memory used by this particular block /
putIterator() operation
+ var unrollMemoryUsedByThisBlock = 0L
+ // Underlying buffer for unrolling the block
+ val redirectableStream = new RedirectableOutputStream
+ val byteArrayChunkOutputStream = new
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+ redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+ val serializationStream: SerializationStream = {
+ val ser = serializerManager.getSerializer(classTag).newInstance()
+ ser.serializeStream(serializerManager.wrapForCompression(blockId,
redirectableStream))
+ }
+
+ // Request enough memory to begin unrolling
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
initialMemoryThreshold)
+
+ if (!keepUnrolling) {
+ logWarning(s"Failed to reserve initial memory threshold of " +
+ s"${Utils.bytesToString(initialMemoryThreshold)} for computing
block $blockId in memory.")
+ } else {
+ unrollMemoryUsedByThisBlock += initialMemoryThreshold
+ }
+
+ def reserveAdditionalMemoryIfNecessary(): Unit = {
+ if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = byteArrayChunkOutputStream.size -
unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
amountToRequest)
+ if (keepUnrolling) {
+ unrollMemoryUsedByThisBlock += amountToRequest
+ }
+ }
+ }
+
+ // Unroll this block safely, checking whether we have exceeded our
threshold
+ while (values.hasNext && keepUnrolling) {
+ serializationStream.writeObject(values.next())(classTag)
+ reserveAdditionalMemoryIfNecessary()
+ }
+
+ // Make sure that we have enough memory to store the block. By this
point, it is possible that
+ // the block's actual memory usage has exceeded the unroll memory by a
small amount, so we
+ // perform one final call to attempt to allocate additional memory if
necessary.
+ if (keepUnrolling) {
+ serializationStream.close()
+ reserveAdditionalMemoryIfNecessary()
+ }
+
+ if (keepUnrolling) {
--- End diff --
I see...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]