ankurdave commented on a change in pull request #32625:
URL: https://github.com/apache/spark/pull/32625#discussion_r638158490



##########
File path: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
##########
@@ -240,6 +240,37 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
     assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L)
   }
 
+  test("SPARK-35486: memory freed by self-spilling is taken by another task") {
+    val memoryManager = createMemoryManager(1000L)
+    val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+    val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+    val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager)
+    val c2 = new TestMemoryConsumer(t2MemManager)
+    val futureTimeout: Duration = 20.seconds
+
+    // t1 acquires 1000 bytes. This should succeed immediately.
+    val t1Result1 = Future { c1.acquireMemory(1000L) }
+    assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
+    assert(c1.getUsed() === 1000L)
+
+    // t2 attempts to acquire 500 bytes. This should block since there is no 
memory available.
+    val t2Result1 = Future { c2.acquireMemory(500L) }
+    Thread.sleep(300)
+    assert(!t2Result1.isCompleted)
+    assert(c2.getUsed() === 0L)
+
+    // t1 attempts to acquire 500 bytes, causing its existing reservation to 
spill partially. t2 is
+    // first in line for the freed memory, so t1 must try again, causing the 
rest of the reservation
+    // to spill.

Review comment:
       That's because of the memory sharing logic in 
[`ExecutionMemoryPool#acquireMemory()`](https://github.com/apache/spark/blob/d4fb98354a24e6343e8be66543c76cb445ec3a2c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L168).
 After releasing 500 bytes, t1 still has 500 bytes remaining, so it is at its 
fair share and cannot acquire more memory. The memory therefore goes to t2 
instead.
   
   My wording of "first in line" might be misleading since there's no ordered 
queue; it's all about fair shares. I'll update the comment to explain this.
   
   In detail:
   
   1. Initially, t1 has 1000 bytes, while t2 has 0 bytes. t2's call to 
`c2.acquireMemory(500)` is blocked inside 
[`ExecutionMemoryPool#acquireMemory()`](https://github.com/apache/spark/blob/d4fb98354a24e6343e8be66543c76cb445ec3a2c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L140),
 waiting on the lock.
   2. t1 spills and releases 500 bytes, leaving it with 500 bytes remaining. It 
[notifies](https://github.com/apache/spark/blob/d4fb98354a24e6343e8be66543c76cb445ec3a2c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L168)
 t2 via the lock.
   3. Both t1 and t2 attempt to acquire memory in 
`ExecutionMemoryPool#acquireMemory()`. t1 finds that it is already at the 
`maxMemoryPerTask` of 500 bytes, so it cannot acquire the released memory. t2 
is able to acquire the memory instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to