Ankur Dave created SPARK-35486: ---------------------------------- Summary: MemoryConsumer reservations that trigger a partial self-spill can fail even if memory is available Key: SPARK-35486 URL: https://issues.apache.org/jira/browse/SPARK-35486 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.1, 3.0.2, 2.4.8 Reporter: Ankur Dave Assignee: Ankur Dave
When a memory reservation triggers a self-spill, ExecutionMemoryPool#releaseMemory() will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again. For example, the following test fails when added to MemoryManagerSuite: {code:scala} test("SPARK-XXX: memory freed by self-spilling is taken by another task") { val memoryManager = createMemoryManager(1000L) val t1MemManager = new TaskMemoryManager(memoryManager, 1) val t2MemManager = new TaskMemoryManager(memoryManager, 2) val c1 = new TestPartialSpillingMemoryConsumer(t1MemManager) val c2 = new TestMemoryConsumer(t2MemManager) val futureTimeout: Duration = 20.seconds // t1 acquires 1000 bytes. This should succeed immediately. val t1Result1 = Future { c1.acquireMemory(1000L) } assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L) assert(c1.getUsed() === 1000L) // t2 attempts to acquire 500 bytes. This should block since there is no memory available. val t2Result1 = Future { c2.acquireMemory(500L) } Thread.sleep(300) assert(!t2Result1.isCompleted) assert(c2.getUsed() === 0L) // t1 attempts to acquire 500 bytes, causing its existing reservation to spill partially. t2 is // first in line for the freed memory. // // SPARK-XXX: This currently causes the reservation to fail. Instead, t1 should try again, // causing the rest of the reservation to spill. val t1Result2 = Future { c1.acquireMemory(500L) } // The spill should release enough memory for both t1's and t2's reservations to be satisfied. assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L) // SPARK-XXX: This assertion fails: 0L != 500L. // assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 500L) } {code} {code:java} /** * A TestMemoryConsumer which, when asked to spill, releases only enough memory to satisfy the * request rather than releasing all its memory. */ public class TestPartialSpillingMemoryConsumer extends TestMemoryConsumer { public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) { super(memoryManager, mode); } public TestPartialSpillingMemoryConsumer(TaskMemoryManager memoryManager) { super(memoryManager); } @Override public long spill(long size, MemoryConsumer trigger) throws IOException { long used = getUsed(); long released = Math.min(used, size); free(released); return released; } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org