[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16603 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98743557 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; --- End diff -- ``` list = sorted.get(key); if (list == null) { // instantiate, add to map } list.add(...); ``` This is slightly cheaper and shorter than your code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98744044 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; +if (sortedConsumers.containsKey(key)) { + list = sortedConsumers.get(key); + list.add(c); +} else { + list = new ArrayList(1); + list.add(c); + sortedConsumers.put(key, list); +} + } +} +while (!sortedConsumers.isEmpty()) { + // Get the consumer using the least memory more than the remaining required memory. + Map.Entry> currentEntry = +sortedConsumers.ceilingEntry(required - got); + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + if (currentEntry == null) { +currentEntry = sortedConsumers.lastEntry(); + } + List cList = currentEntry.getValue(); + MemoryConsumer c = cList.remove(cList.size() - 1); + if (cList.size() == 0) { --- End diff -- `cList.isEmpty()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98743237 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); --- End diff -- nit: you could just have used `new TreeMap<>()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98427063 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; +if (sortedConsumers.containsKey(key)) { + list = sortedConsumers.get(key); +} else { + list = new ArrayList(1); +} +list.add(c); +sortedConsumers.put(key, list); + } +} +while (true) { --- End diff -- Use `while (!sortedConsumers.isEmpty)` - and remove `if (currentEntry == null)` below (line 179) since it cant fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98426729 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; +if (sortedConsumers.containsKey(key)) { + list = sortedConsumers.get(key); +} else { + list = new ArrayList(1); +} +list.add(c); +sortedConsumers.put(key, list); --- End diff -- We need to do a `sortedConsumers.put` only when key is missing in sortedConsumers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98427323 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; +if (sortedConsumers.containsKey(key)) { + list = sortedConsumers.get(key); +} else { + list = new ArrayList(1); +} +list.add(c); +sortedConsumers.put(key, list); + } +} +while (true) { + // Get the consumer using the least memory more than the remaining required memory. + Map.Entry> currentEntry = +sortedConsumers.ceilingEntry(required - got); + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + if (currentEntry == null) { +currentEntry = sortedConsumers.lastEntry(); + } + // No available consumer. + if (currentEntry == null) { +break; + } + List cList = currentEntry.getValue(); + MemoryConsumer c = cList.remove(cList.size() - 1); + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } +if (cList.size() == 0) { --- End diff -- Move this right below `MemoryConsumer c = cList.remove(cList.size() - 1);` (line 183) - so that we always remove from list of consumers irrespective of whether spill fails or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97910353 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); --- End diff -- Use a list of consumer as value now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97910287 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); --- End diff -- ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97910276 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); + } +} +// Get the consumer using the least memory more than the remaining required memory. +Map.Entry currentEntry = sortedConsumers.ceilingEntry(required - got); +if (currentEntry == null) { + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + currentEntry = sortedConsumers.lastEntry(); +} +while (currentEntry != null) { --- End diff -- Anyway, I changed the loop so no duplicate codes to access the treemap. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97909013 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); --- End diff -- Not sure which is is cheaper. Changed to allocate new object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97908933 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); --- End diff -- ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97908207 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); + } +} +// Get the consumer using the least memory more than the remaining required memory. +Map.Entry currentEntry = sortedConsumers.ceilingEntry(required - got); +if (currentEntry == null) { + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + currentEntry = sortedConsumers.lastEntry(); +} +while (currentEntry != null) { --- End diff -- When entry this loop, `currentEntry` could be `null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97894091 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); --- End diff -- Is there a point in re-using the same map if you're just going to clear it? Performance-wise this is probably slower. Allocating a new object is probably cheaper than resetting the existing one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97894531 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); --- End diff -- Use `long`; that results in less boxing operations than the current code. But maybe if you follow Mridul's suggestion you can get rid of this altogether. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97846848 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); --- End diff -- Why are we fudging the key here ? Either make the value a list of memory consumers or use a composite key with a custom comparator : former is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97846569 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +sortedConsumers.clear(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +Long key = c.getUsed(); +// If there is existing consumer using the same amount of memory, +// increasing the key. +while (sortedConsumers.containsKey(key)) { + key += 1; +} +sortedConsumers.put(key, c); + } +} +// Get the consumer using the least memory more than the remaining required memory. +Map.Entry currentEntry = sortedConsumers.ceilingEntry(required - got); +if (currentEntry == null) { + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + currentEntry = sortedConsumers.lastEntry(); +} +while (currentEntry != null) { --- End diff -- Use do-while instead of `var v = init(); while(isValid(v)) { ... v = init() } ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r97791322 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- @mridulm Ok. I see. Thanks. This is a problem indeed. I try to fix this with TreeMap in latest change. May you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96714353 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- Let me elaborate with the earlier example. Suppose you have consumers 1.5GB 1GB 500MB 2MB 1MB and we need 500 MB. In existing implementation, irrespective of the order of iteration, we will always end up freeing >= 500MB: for example (since iteration order is not known) - 2MB (released = 1.8M), 500 MB (released 490M), 1GB (released 0.99G) - the numbers are illustrative only. In the proposed implementation, we will ignore 1.5GB, 1GB - since 500MB >= required. When we get to 500MB, if the released = 490MB, we now end up in a situation where remaining blocks cannot satisfy the allocation request - and we end up freeing self. All the while when we actually do have blocks which could have been freed. Let me know if I am missing something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96589311 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- I am not sure how we keep iterating the consumers in existing impl. As we loop on all consumers, no matter we spill it or not or how many memory we released, we won't go back to it and previous consumers, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96588590 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- In existing impl, we will keep iterating until the constraint is satisfied - so it is not an issue; while with this change, it will be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96586900 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- Thanks for pointing out that. Yes, it is possible. However, it is an existing issue in current implementation before this change. Besides, we can't know how much memory release we can get before calling `spill`. So I don't think we will have a satisfying solution to this issue under current framework. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96581689 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(consumers.size()); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (int listIndex = 0; listIndex < sortedList.size(); listIndex++) { + MemoryConsumer c = sortedList.get(listIndex); + // Try to only spill on the consumer which has the required size of memory. + // As the consumers are sorted in descending order, if the next consumer doesn't have + // the required memory, then we need to spill the current consumer at least. + boolean doSpill = (listIndex + 1) == sortedList.size() || +sortedList.get(listIndex + 1).getUsed() < (required - got); + if (doSpill) { --- End diff -- I like the fact that this implementation does not need to incur the cost of remove in a TreeMap. Unfortunately, I dont think it is sufficient : the impl assumes that spill() will actually always give you back getUsed - from the rest of the code in the method, this does not look like a valid assumption to make. This can resulting in spilling a large number of smaller blocks, and potentially itself. For example: required = 500MB, consumers = 1.5GB 1GB 500MB 2MB 1MB .. If spilling 500MB resulted in (say) releasing 490MB, we might end up spilling a large number of blocks and also (potentially) end up spilling itself - also can end up returning less than requested while enough memory does exist to satisfy the request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96574267 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- Actually the newest update is already satisfying the example you show. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96572902 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- Use ceiling and not floor. Ensure that the requirements are satisfied : what I wrote was on the fly to convey the idea and not meant to be used literally - and apparently there were some errors. I have edited the examples so that there is no further confusion. Basic idea is simple : instead of picking largest or random consumer, pick one which is sufficient to meet the memory requirements. If none exist, then evict the largest and retry until done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96565501 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- I don't think the best or better strategy is to spill many smaller consumers for a bigger memory request. Too frequently spill small consumers should cause GC pressure as reported in #16387. Actually this PR is inspired by that to avoid the same consumer is spilled again and again even it only uses small memory. The newest update is to spill the ceiling entry (the least consumer uses greater than or equal to the required memory). For the same example, consumers = 50 MB, 100MB, 200 MB, 250MB, 500 MB, 1GB, it behaves like: Required: 1.4 GB Evict: 1GB, 500MB Required: 300 MB Evict: 500MB Required: 400MB Evict: 500MB Required: 60 MB Evict 100 MB Required: 200 MB Evict 200 MB It is nearly the same as the result you showed. As I pointed in previous comment, the logic you showed actually can't produce that result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96560544 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- I got your idea. But few examples look wrong. > Required: 400MB Evict: 500MB The `floorEntry` should be 250MB. So it would evict 250MB, 200MB, 100MB. > Required: 60 MB Evict 100 MB The `floorEntry` should be 50MB. So it would evict 50MB, 1GB. But with some modification to the logic, I think we can fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96360466 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- Regarding proposal two to change to TreeSet (or TreeMap as elaborated above) instead of HashSet. If the ordering is dependent on something which changes - then ordering is not gauranteed : which is why, if you observe, I mentioned that each change in memory acquisition or release should remove and re-add the consumer to the set so that the invariant is re-established. This might have other impacts (difficulty to ensure this is consistently enforced, performance, etc) - which should be looked into before proceeding down that path. The benefit is the elimination of sorting (in current PR) or creation of TreeMap (I elaborated) each time we need to spill. Depending on how (un)common and expensive the latter is, we should take a call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96359966 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- To elaborate so that there is confusion. A) If we are keeping existing implementation: TreeMap where comparator is enforces natural ordering we need (trivial to break ties when memory in two consumers is the same - example, make it a composite key where relevant portion is the memory and ignored portion is the identityHashCode: with latter used to break ties in ordering and equality - or something better). * Instead of insertAll to list and sorting - insert all to TreeSet - complexity and code remains same. * Instead of iterating over sorted list trying to find minimum subset to remove (current implementation in the PR is not optimal, and marginally better than what exists) - leverage TreeMap's ability to find appropriate Consumer if one exists which optimally satisfies - else use largest. Something like (illustrative only) : ``` while (required > 0) { val requiredKey = MyKey(required, 0) var entry = map.floorEntry(requiredKey) val consumer = if (null == entry) map.lastEntry.getValue else entry.getValue // evict consumer ... required -= consumer.getUsed } ``` Note - I dont want to make this into a dp problem to do perfect binning - a good enough fast solution is sufficient. -- Some example: consumers = 50 MB, 100MB, 200 MB, 250MB, 500 MB, 1GB Required: 1.4 GB Evict: 1GB, 500MB Required: 300 MB Evict: 250 MB, 100MB Required: 400MB Evict: 500MB Required: 60 MB Evict 100 MB Required: 200 MB Evict 200 MB -- There are ofcourse better solutions for the examples above, a first cut implementation which does the above should be sufficiently good enough compared to what currently exists. If you want to explore a more optimum solution, that is fine too - as long as time and space complexity are bounded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@sp
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96337114 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- As the memory usage of memory consumer is changing over time, not sure if we use TreeSet/TreeMap for consumers, can we get the correctly sorted order from the TreeSet/Map? In other words, the sorted order of TreeSet/Map is still guaranteed if the elements are mutable and changing after insertion? I think it is not. If we are going to do sorting here anyway, a TreeMap/TreeSet might be overkill than a list like that. Another concern is that the API of TreeMap/TreeSet can let us find the tail set or ceiling element, but it requires we give it an input element to compare. But we only have the required memory number, not a memory consumer to compare. Another concern is that TreeSet/TreeMap could return an empty set if all elements have less memory than required size. In this case, we need to go back to iterate all elements in the set/map to spill. It seems add more complexity. Totally agreed that it is better to fetch the required size instead of going from largest to smallest always. With the current list based approach, we still can achieve that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96296239 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -53,6 +57,28 @@ */ public class TaskMemoryManager { + /** + * A internal class used to sort MemoryConsumer based on their memory usage. + * Note: This sorts consumers by descending order, i.e., the consumers using more memory + * are sorted ahead of the consumers using less. + */ + private static final class ConsumerComparator implements Comparator { +@Override +public int compare(MemoryConsumer consumer1, MemoryConsumer consumer2) { + // We can only compare the consumers which use the same mode. + assert (consumer1.getMode() == consumer2.getMode()) : +"Try to compare two MemoryConsumers which are in different memory mode."; + if (consumer1.getUsed() < consumer2.getUsed()) { +return 1; + } else if (consumer1.getUsed() > consumer2.getUsed()) { +return -1; + } else { +return 0; + } --- End diff -- Long.compare(consumer2.getUsed, consumer1.getUsed) instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96296478 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); --- End diff -- initialSize to consumers.size() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r96298011 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +List sortedList = new ArrayList<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +sortedList.add(c); + } +} +Collections.sort(sortedList, new ConsumerComparator()); +for (MemoryConsumer c: sortedList) { + try { +long released = c.spill(required - got, consumer); +if (released > 0) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, +Utils.bytesToString(released), c, consumer); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); + if (got >= required) { +break; } -} catch (IOException e) { - logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " -+ e.getMessage()); } + } catch (IOException e) { +logger.error("error while calling spill() on " + c, e); +throw new OutOfMemoryError("error while calling spill() on " + c + " : " + + e.getMessage()); } --- End diff -- Instead of this, why not leverage a TreeMap or a TreeSet and fetch the required size directly instead of going from largest to smallest always ? (If you need 100MB, you have 1GB, 500MB, 150MB, 50MB - you will always spill 1GB: which is not required). If using TreeMap, take a look at `TreeMap.lastEntry` for former and `TreeMap.map.ceilingEntry` for latter (or floorEntry, depending on your comparator) - similar exists for TreeSet. A more invasive change could be to replace consumers to a TreeSet from HashSet : though it becomes tricky since any change in its memory footprint will require a remove + add to ensure TreeSet invariants are met; which might make the code fragile. But it will make spill more efficient, since we dont need to keep doing n log n operation for each spill when insufficient memory exists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/16603 [SPARK-19244][Core] Sort MemoryConsumers according to their memory usage when spilling ## What changes were proposed in this pull request? In `TaskMemoryManager `, when we acquire memory by calling `acquireExecutionMemory` and we can't acquire required memory, we will try to spill other memory consumers. Currently, we simply iterates the memory consumers in a hash set. Normally each time the consumer will be iterated in the same order. The first issue is that we might spill additional consumers. For example, if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB but we can only get 60MB and spilling is needed. We might spill both consumer 1 and consumer 2. But we actually just need to spill consumer 2 and get the required 100MB. The second issue is that if we spill consumer 1 in first time spilling. After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory and spilling is needed again. Because we iterate the memory consumers in the same order, we will spill consumer 1 again. So for consumer 1, we will produce many small spilling files. This patch modifies the way iterating the memory consumers. It sorts the memory consumers by their memory usage. So the consumer using more memory will spill first. Once it is spilled, even it acquires few memory again, in next time spilling happens it will not be the consumers to spill again if there are other consumers using more memory than it. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 sort-memoryconsumer-when-spill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16603 commit 4c2b7b02e809614993d25b21aee3e1d55355e482 Author: Liang-Chi Hsieh Date: 2017-01-16T08:57:57Z Sort MemoryConsumers according to their memory usage when spilling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org