JoshRosen commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r730198470



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, 
long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer 
requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to 
allocate on-heap
     // memory here, then it may not make sense to spill since that would only 
end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky 
to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, 
mode);

Review comment:
       If I understand correctly, we're debating whether to add to the list of 
memory consumers either at the start of the method (here) or at the very end 
(right before returning `got`).
   
   As @timarmstrong's notes at 
https://github.com/apache/spark/pull/34186#discussion_r729523404, this only 
makes a difference when we have never previously called 
`acquireExecutionMemory` for this `requestingConsumer`.
   
   Adding the consumer at the beginning of the method means that we might try 
to self-spill a `requestingConsumer` which has never requested memory before 
and therefore is using no memory. I took at look at the existing 
`MemoryConsumer.spill()` implementations in Spark to see if there's any cases 
where this might be a problem:
   
   - RowBasedKeyValueBatch and HashedRelation's spill methods are always no-ops.
   - `RowQueue`, `Spillable` and `BytesToBytesMap` spill is always a no-op for 
self spills.
   - ShuffleExternalSorter and UnsafeExternalSorter's spill methods are no-ops 
if those consumers haven't acquired any memory.
   - `TestMemoryConsumer` and its subclasses look like they'll be fine. There, 
we have: 
https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java#L34-L39
      If it hasn't acquired any memory, we'll call `free(0)`, which will result 
in a series of calls that eventually flow to 
https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L152
 where it looks like things will work correctly.
   
   Given all of that, I think having the call earlier in the method won't cause 
problems for the existing consumers defined in Spark. I suppose it could 
potentially impact custom consumers defined outside of Spark, though. 
   
   If we're trying to be as cautious and faithful to the old behavior as 
possible then maybe it's simpler to put it back at the end. I don't feel super 
strongly about this, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to