JoshRosen commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r730191882
##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager,
long taskAttemptId) {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ public long acquireExecutionMemory(long required, MemoryConsumer
requestingConsumer) {
assert(required >= 0);
- assert(consumer != null);
- MemoryMode mode = consumer.getMode();
+ assert(requestingConsumer != null);
+ MemoryMode mode = requestingConsumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to
allocate on-heap
// memory here, then it may not make sense to spill since that would only
end up freeing
// off-heap memory. This is subject to change, though, so it may be risky
to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
+ consumers.add(requestingConsumer);
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId,
mode);
// Try to release memory from other consumers first, then we can reduce
the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
- // Call spill() on other consumers to release memory
- // Sort the consumers according their memory usage. So we avoid
spilling the same consumer
- // which is just spilled in last few times and re-spilling on it will
produce many small
- // spill files.
+ logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+ Utils.bytesToString(required - got), requestingConsumer);
+ // We need to call spill() on consumers to free up more memory. We
want to optimize for two
+ // things:
+ // * Minimize the number of spill calls, to reduce the number of spill
files and avoid small
+ // spill files.
+ // * Avoid spilling more data than necessary - if we only need a
little more memory, we may
+ // not want to spill as much data as possible. Many consumers spill
more than the
+ // requested amount, so we can take that into account in our
decisions.
+ // We use a heuristic that selects the smallest memory consumer with
at least `required`
+ // bytes of memory in an attempt to balance these factors. It may work
well if there are
+ // fewer larger requests, but can result in many small spills if there
are many smaller
+ // requests.
+
+ // Build a map of consumer in order of memory usage to prioritize
spilling. Assign current
+ // consumer (if present) a nominal memory usage of 0 so that it is
always last in priority
+ // order. The map will include all consumers that have previously
acquired memory.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
- if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
- long key = c.getUsed();
+ if (c.getUsed() > 0 && c.getMode() == mode) {
+ long key = c == requestingConsumer ? 0 : c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
- while (!sortedConsumers.isEmpty()) {
+ // Iteratively spill consumers until we've freed enough memory or run
out of consumers.
+ while (got < required && !sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining
required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
- // No consumer has used memory more than the remaining required
memory.
- // Get the consumer of largest used memory.
+ // No consumer has enough memory on its own, start with spilling the
biggest consumer.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
- MemoryConsumer c = cList.get(cList.size() - 1);
- try {
- long released = c.spill(required - got, consumer);
- if (released > 0) {
- logger.debug("Task {} released {} from {} for {}", taskAttemptId,
- Utils.bytesToString(released), c, consumer);
- got += memoryManager.acquireExecutionMemory(required - got,
taskAttemptId, mode);
- if (got >= required) {
- break;
- }
- } else {
- cList.remove(cList.size() - 1);
- if (cList.isEmpty()) {
- sortedConsumers.remove(currentEntry.getKey());
- }
- }
- } catch (ClosedByInterruptException e) {
- // This called by user to kill a task (e.g: speculative task).
- logger.error("error while calling spill() on " + c, e);
- throw new RuntimeException(e.getMessage());
- } catch (IOException e) {
- logger.error("error while calling spill() on " + c, e);
- // checkstyle.off: RegexpSinglelineJava
- throw new SparkOutOfMemoryError("error while calling spill() on "
+ c + " : "
- + e.getMessage());
- // checkstyle.on: RegexpSinglelineJava
+ got += trySpillAndAcquire(requestingConsumer, required - got, cList,
cList.size() - 1);
+ if (cList.isEmpty()) {
+ sortedConsumers.remove(currentEntry.getKey());
}
}
}
- // Attempt to free up memory by self-spilling.
- //
- // When our spill handler releases memory,
`ExecutionMemoryPool#releaseMemory()` will
- // immediately notify other tasks that memory has been freed, and they
may acquire the
- // newly-freed memory before we have a chance to do so (SPARK-35486). In
that case, we will
- // try again in the next loop iteration.
- while (got < required) {
- try {
- long released = consumer.spill(required - got, consumer);
- if (released > 0) {
- logger.debug("Task {} released {} from itself ({})", taskAttemptId,
- Utils.bytesToString(released), consumer);
- got += memoryManager.acquireExecutionMemory(required - got,
taskAttemptId, mode);
- } else {
- // Self-spilling could not free up any more memory.
- break;
- }
- } catch (ClosedByInterruptException e) {
- // This called by user to kill a task (e.g: speculative task).
- logger.error("error while calling spill() on " + consumer, e);
- throw new RuntimeException(e.getMessage());
- } catch (IOException e) {
- logger.error("error while calling spill() on " + consumer, e);
- // checkstyle.off: RegexpSinglelineJava
- throw new SparkOutOfMemoryError("error while calling spill() on " +
consumer + " : "
- + e.getMessage());
- // checkstyle.on: RegexpSinglelineJava
- }
- }
-
- consumers.add(consumer);
- logger.debug("Task {} acquired {} for {}", taskAttemptId,
Utils.bytesToString(got), consumer);
+ logger.debug("Task {} acquired {} for {}", taskAttemptId,
Utils.bytesToString(got),
+ requestingConsumer);
return got;
}
}
+ /**
+ * Try to acquire as much memory as possible from `cList[idx]`, up to
`requested` bytes by
+ * spilling and then acquiring the freed memory. If no more memory can be
spilled from
+ * `cList[idx]`, remove it from the list.
+ *
+ * @return number of bytes acquired (<= requested)
+ * @throws RuntimeException if task is interrupted
+ * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+ */
+ private long trySpillAndAcquire(
+ MemoryConsumer requestingConsumer,
+ long requested, List<MemoryConsumer> cList,
Review comment:
```suggestion
long requested,
List<MemoryConsumer> cList,
```
##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager,
long taskAttemptId) {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ public long acquireExecutionMemory(long required, MemoryConsumer
requestingConsumer) {
assert(required >= 0);
- assert(consumer != null);
- MemoryMode mode = consumer.getMode();
+ assert(requestingConsumer != null);
+ MemoryMode mode = requestingConsumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to
allocate on-heap
// memory here, then it may not make sense to spill since that would only
end up freeing
// off-heap memory. This is subject to change, though, so it may be risky
to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
+ consumers.add(requestingConsumer);
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId,
mode);
Review comment:
If I understand correctly, we're debating whether to add to the list of
memory consumers either at the start of the method (here) or at the very end
(right before returning `got`).
As @timarmstrong's notes at
https://github.com/apache/spark/pull/34186#discussion_r729523404, this only
makes a difference when we have never previously called
`acquireExecutionMemory` for this `requestingConsumer`.
Adding the consumer at the beginning of the method means that we might try
to self-spill a `requestingConsumer` which has never requested memory before
and therefore is using no memory. I took at look at the existing
`MemoryConsumer.spill()` implementations in Spark to see if there's any cases
where this might be a problem:
- RowBasedKeyValueBatch and HashedRelation's spill methods are always no-ops.
- `RowQueue`, `Spillable` and `BytesToBytesMap` spill is always a no-op for
self spills.
-ShuffleExternalSorter and UnsafeExternalSorter's spill methods are no-ops
if those consumers haven't acquired any memory.
- `TestMemoryConsumer` and its subclasses look like they'll be fine. There,
we have:
https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java#L34-L39
If it hasn't acquired any memory, we'll call `free(0)`, which will result
in a series of calls that eventually flow to
https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L152
where it looks like things will work correctly.
Given all of that, I think having the call earlier in the method won't cause
problems for the existing consumers defined in Spark. I suppose it could
potentially impact custom consumers defined outside of Spark, though.
If we're trying to be as cautious and faithful to the old behavior as
possible then maybe it's simpler to put it back at the end. I don't feel super
strongly about this, though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]