Github user ConeyLiu commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r163768689
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -232,78 +236,93 @@ private[spark] class MemoryStore(
elementsUnrolled += 1
}
+ val valuesBuilder = if (keepUnrolling) {
+ Some(valuesHolder.getBuilder())
+ } else {
+ None
+ }
+
+ // Make sure that we have enough memory to store the block. By this
point, it is possible that
+ // the block's actual memory usage has exceeded the unroll memory by a
small amount, so we
+ // perform one final call to attempt to allocate additional memory if
necessary.
if (keepUnrolling) {
- // We successfully unrolled the entirety of this block
- val arrayValues = vector.toArray
- vector = null
- val entry =
- new DeserializedMemoryEntry[T](arrayValues,
SizeEstimator.estimate(arrayValues), classTag)
- val size = entry.size
- def transferUnrollToStorage(amount: Long): Unit = {
- // Synchronize so that transfer is atomic
- memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
- val success = memoryManager.acquireStorageMemory(blockId,
amount, MemoryMode.ON_HEAP)
- assert(success, "transferring unroll memory to storage memory
failed")
+ val size = valuesBuilder.get.preciseSize
+ if (size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = size - unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId,
amountToRequest, memoryMode)
+ if (keepUnrolling) {
+ unrollMemoryUsedByThisBlock += amountToRequest
}
}
- // Acquire storage memory if necessary to store this block in memory.
- val enoughStorageMemory = {
- if (unrollMemoryUsedByThisBlock <= size) {
- val acquiredExtra =
- memoryManager.acquireStorageMemory(
- blockId, size - unrollMemoryUsedByThisBlock,
MemoryMode.ON_HEAP)
- if (acquiredExtra) {
- transferUnrollToStorage(unrollMemoryUsedByThisBlock)
- }
- acquiredExtra
- } else { // unrollMemoryUsedByThisBlock > size
- // If this task attempt already owns more unroll memory than is
necessary to store the
- // block, then release the extra memory that will not be used.
- val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP,
excessUnrollMemory)
- transferUnrollToStorage(size)
- true
- }
+ }
+
+ if (keepUnrolling) {
--- End diff --
updated
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]