srowen closed pull request #23426: [SPARK-26527][CORE] Let acquireUnrollMemory 
fail fast if required space exceeds memory limit
URL: https://github.com/apache/spark/pull/23426
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index a6f7db0600e60..fa323723557b5 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -79,16 +79,23 @@ private[spark] class StaticMemoryManager(
       memoryMode: MemoryMode): Boolean = synchronized {
     require(memoryMode != MemoryMode.OFF_HEAP,
       "StaticMemoryManager does not support off-heap unroll memory")
-    val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
-    val freeMemory = onHeapStorageMemoryPool.memoryFree
-    // When unrolling, we will use all of the existing free memory, and, if 
necessary,
-    // some extra space freed from evicting cached blocks. We must place a cap 
on the
-    // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
-    // big block can blow away the entire cache.
-    val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory 
- freeMemory)
-    // Keep it within the range 0 <= X <= maxNumBytesToFree
-    val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
-    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    if (numBytes > maxOnHeapStorageMemory) {
+      // Fail fast if the block simply won't fit
+      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
+        s"memory limit ($maxOnHeapStorageMemory bytes)")
+      false
+    } else {
+      val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+      val freeMemory = onHeapStorageMemoryPool.memoryFree
+      // When unrolling, we will use all of the existing free memory, and, if 
necessary,
+      // some extra space freed from evicting cached blocks. We must place a 
cap on the
+      // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
+      // big block can blow away the entire cache.
+      val maxNumBytesToFree = math.max(0, maxUnrollMemory - 
currentUnrollMemory - freeMemory)
+      // Keep it within the range 0 <= X <= maxNumBytesToFree
+      val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
+      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+    }
   }
 
   private[memory]
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7274072e5049a..baff672f5fb8f 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -291,11 +291,11 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("b3")
     putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
 
-    // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
+    // Unroll huge block with not enough space. This should fail.
     val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
-    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an 
iterator


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to