Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r162534339
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +263,93 @@ private[spark] class MemoryStore(
// If this task attempt already owns more unroll memory than is
necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP,
excessUnrollMemory)
+ releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
+
if (enoughStorageMemory) {
entries.synchronized {
- entries.put(blockId, entry)
+ entries.put(blockId, createMemoryEntry())
}
logInfo("Block %s stored as values in memory (estimated size %s,
free %s)".format(
blockId, Utils.bytesToString(size),
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >=
unrollMemoryUsedByThisBlock,
"released too much unroll memory")
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ } else {
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ }
+
+ /**
+ * Attempt to put the given block in memory store as values.
+ *
+ * It's possible that the iterator is too large to materialize and store
in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while
periodically checking
+ * whether there is enough free memory. If the block is successfully
materialized, then the
+ * temporary unroll memory used during the materialization is
"transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the
block.
--- End diff --
let's not duplicated this document
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]