Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/16603#discussion_r96298011
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required,
MemoryConsumer consumer) {
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
+ // Sort the consumers according their memory usage. So we avoid
spilling the same consumer
+ // which is just spilled in last few times and re-spilling on it
will produce many small
+ // spill files.
+ List<MemoryConsumer> sortedList = new ArrayList<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
- try {
- long released = c.spill(required - got, consumer);
- if (released > 0) {
- logger.debug("Task {} released {} from {} for {}",
taskAttemptId,
- Utils.bytesToString(released), c, consumer);
- got += memoryManager.acquireExecutionMemory(required -
got, taskAttemptId, mode);
- if (got >= required) {
- break;
- }
+ sortedList.add(c);
+ }
+ }
+ Collections.sort(sortedList, new ConsumerComparator());
+ for (MemoryConsumer c: sortedList) {
+ try {
+ long released = c.spill(required - got, consumer);
+ if (released > 0) {
+ logger.debug("Task {} released {} from {} for {}",
taskAttemptId,
+ Utils.bytesToString(released), c, consumer);
+ got += memoryManager.acquireExecutionMemory(required - got,
taskAttemptId, mode);
+ if (got >= required) {
+ break;
}
- } catch (IOException e) {
- logger.error("error while calling spill() on " + c, e);
- throw new OutOfMemoryError("error while calling spill() on "
+ c + " : "
- + e.getMessage());
}
+ } catch (IOException e) {
+ logger.error("error while calling spill() on " + c, e);
+ throw new OutOfMemoryError("error while calling spill() on " +
c + " : "
+ + e.getMessage());
}
--- End diff --
Instead of this, why not leverage a TreeMap or a TreeSet and fetch the
required size directly instead of going from largest to smallest always ? (If
you need 100MB, you have 1GB, 500MB, 150MB, 50MB - you will always spill 1GB:
which is not required).
If using TreeMap, take a look at `TreeMap.lastEntry` for former and
`TreeMap.map.ceilingEntry` for latter (or floorEntry, depending on your
comparator) - similar exists for TreeSet.
A more invasive change could be to replace consumers to a TreeSet from
HashSet : though it becomes tricky since any change in its memory footprint
will require a remove + add to ensure TreeSet invariants are met; which might
make the code fragile. But it will make spill more efficient, since we dont
need to keep doing n log n operation for each spill when insufficient memory
exists.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]