mridulm commented on a change in pull request #32625:
URL: https://github.com/apache/spark/pull/32625#discussion_r638111271
##########
File path: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
##########
@@ -240,6 +240,37 @@ private[memory] trait MemoryManagerSuite extends
SparkFunSuite with BeforeAndAft
assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L)
}
+ test("SPARK-35486: memory freed by self-spilling is taken by another task") {
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager)
+ val c2 = new TestMemoryConsumer(t2MemManager)
+ val futureTimeout: Duration = 20.seconds
+
+ // t1 acquires 1000 bytes. This should succeed immediately.
+ val t1Result1 = Future { c1.acquireMemory(1000L) }
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
+ assert(c1.getUsed() === 1000L)
+
+ // t2 attempts to acquire 500 bytes. This should block since there is no
memory available.
+ val t2Result1 = Future { c2.acquireMemory(500L) }
+ Thread.sleep(300)
+ assert(!t2Result1.isCompleted)
+ assert(c2.getUsed() === 0L)
+
+ // t1 attempts to acquire 500 bytes, causing its existing reservation to
spill partially. t2 is
+ // first in line for the freed memory, so t1 must try again, causing the
rest of the reservation
+ // to spill.
Review comment:
I am a bit unclear on this - how does t2 get pref over t1 for this
released memory ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]