[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-02-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16603


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98743557
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
--- End diff --

```
list = sorted.get(key);
if (list == null) {
  // instantiate, add to map
}
list.add(...);
```

This is slightly cheaper and shorter than your code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98744044
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
+if (sortedConsumers.containsKey(key)) {
+  list = sortedConsumers.get(key);
+  list.add(c);
+} else {
+  list = new ArrayList(1);
+  list.add(c);
+  sortedConsumers.put(key, list);
+}
+  }
+}
+while (!sortedConsumers.isEmpty()) {
+  // Get the consumer using the least memory more than the 
remaining required memory.
+  Map.Entry> currentEntry =
+sortedConsumers.ceilingEntry(required - got);
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  if (currentEntry == null) {
+currentEntry = sortedConsumers.lastEntry();
+  }
+  List cList = currentEntry.getValue();
+  MemoryConsumer c = cList.remove(cList.size() - 1);
+  if (cList.size() == 0) {
--- End diff --

`cList.isEmpty()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98743237
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
--- End diff --

nit: you could just have used `new TreeMap<>()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98427063
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
+if (sortedConsumers.containsKey(key)) {
+  list = sortedConsumers.get(key);
+} else {
+  list = new ArrayList(1);
+}
+list.add(c);
+sortedConsumers.put(key, list);
+  }
+}
+while (true) {
--- End diff --

Use `while (!sortedConsumers.isEmpty)` - and remove `if (currentEntry == 
null)` below (line 179) since it cant fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98426729
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
+if (sortedConsumers.containsKey(key)) {
+  list = sortedConsumers.get(key);
+} else {
+  list = new ArrayList(1);
+}
+list.add(c);
+sortedConsumers.put(key, list);
--- End diff --

We need to do a `sortedConsumers.put` only when key is missing in 
sortedConsumers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-30 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98427323
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,56 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
+if (sortedConsumers.containsKey(key)) {
+  list = sortedConsumers.get(key);
+} else {
+  list = new ArrayList(1);
+}
+list.add(c);
+sortedConsumers.put(key, list);
+  }
+}
+while (true) {
+  // Get the consumer using the least memory more than the 
remaining required memory.
+  Map.Entry> currentEntry =
+sortedConsumers.ceilingEntry(required - got);
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  if (currentEntry == null) {
+currentEntry = sortedConsumers.lastEntry();
+  }
+  // No available consumer.
+  if (currentEntry == null) {
+break;
+  }
+  List cList = currentEntry.getValue();
+  MemoryConsumer c = cList.remove(cList.size() - 1);
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+if (cList.size() == 0) {
--- End diff --

Move this right below `MemoryConsumer c = cList.remove(cList.size() - 1);` 
(line 183) - so that we always remove from list of consumers irrespective of 
whether spill fails or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97910353
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
--- End diff --

Use a list of consumer as value now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97910287
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
--- End diff --

ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97910276
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
+  }
+}
+// Get the consumer using the least memory more than the remaining 
required memory.
+Map.Entry currentEntry = 
sortedConsumers.ceilingEntry(required - got);
+if (currentEntry == null) {
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  currentEntry = sortedConsumers.lastEntry();
+}
+while (currentEntry != null) {
--- End diff --

Anyway, I changed the loop so no duplicate codes to access the treemap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97909013
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
--- End diff --

Not sure which is is cheaper. Changed to allocate new object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97908933
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
--- End diff --

ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97908207
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
+  }
+}
+// Get the consumer using the least memory more than the remaining 
required memory.
+Map.Entry currentEntry = 
sortedConsumers.ceilingEntry(required - got);
+if (currentEntry == null) {
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  currentEntry = sortedConsumers.lastEntry();
+}
+while (currentEntry != null) {
--- End diff --

When entry this loop, `currentEntry` could be `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97894091
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
--- End diff --

Is there a point in re-using the same map if you're just going to clear it?

Performance-wise this is probably slower. Allocating a new object is 
probably cheaper than resetting the existing one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97894531
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
--- End diff --

Use `long`; that results in less boxing operations than the current code. 
But maybe if you follow Mridul's suggestion you can get rid of this altogether.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97846848
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
--- End diff --

Why are we fudging the key here ?
Either make the value a list of memory consumers or use a composite key 
with a custom comparator : former is preferred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97846569
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +152,49 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+sortedConsumers.clear();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+Long key = c.getUsed();
+// If there is existing consumer using the same amount of 
memory,
+// increasing the key.
+while (sortedConsumers.containsKey(key)) {
+  key += 1;
+}
+sortedConsumers.put(key, c);
+  }
+}
+// Get the consumer using the least memory more than the remaining 
required memory.
+Map.Entry currentEntry = 
sortedConsumers.ceilingEntry(required - got);
+if (currentEntry == null) {
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  currentEntry = sortedConsumers.lastEntry();
+}
+while (currentEntry != null) {
--- End diff --

Use do-while instead of `var v = init(); while(isValid(v)) { ... v = init() 
} `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r97791322
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --

@mridulm Ok. I see. Thanks. This is a problem indeed. I try to fix this 
with TreeMap in latest change. May you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96714353
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --

Let me elaborate with the earlier example.
Suppose you have consumers 1.5GB 1GB 500MB 2MB 1MB and we need 500 MB.

In existing implementation, irrespective of the order of iteration, we will 
always end up freeing >= 500MB: for example (since iteration order is not 
known) - 2MB (released = 1.8M), 500 MB (released 490M), 1GB (released 0.99G) - 
the numbers are illustrative only.


In the proposed implementation, we will ignore 1.5GB, 1GB - since 500MB >= 
required.
When we get to 500MB, if the released = 490MB, we now end up in a situation 
where remaining blocks cannot satisfy the allocation request - and we end up 
freeing self.
All the while when we actually do have blocks which could have been freed.



Let me know if I am missing something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96589311
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --

I am not sure how we keep iterating the consumers in existing impl. As we 
loop on all consumers, no matter we spill it or not or how many memory we 
released, we won't go back to it and previous consumers, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96588590
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --

In existing impl, we will keep iterating until the constraint is satisfied 
- so it is not an issue; while with this change, it will be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96586900
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --

Thanks for pointing out that.

Yes, it is possible. However, it is an existing issue in current 
implementation before this change. Besides, we can't know how much memory 
release we can get before calling `spill`. So I don't think we will have a 
satisfying solution to this issue under current framework.







---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96581689
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,8 +164,24 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new 
ArrayList<>(consumers.size());
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (int listIndex = 0; listIndex < sortedList.size(); 
listIndex++) {
+  MemoryConsumer c = sortedList.get(listIndex);
+  // Try to only spill on the consumer which has the required size 
of memory.
+  // As the consumers are sorted in descending order, if the next 
consumer doesn't have
+  // the required memory, then we need to spill the current 
consumer at least.
+  boolean doSpill = (listIndex + 1) == sortedList.size() ||
+sortedList.get(listIndex + 1).getUsed() < (required - got);
+  if (doSpill) {
--- End diff --


I like the fact that this implementation does not need to incur the cost of 
remove in a TreeMap.
Unfortunately, I dont think it is sufficient : the impl assumes that 
spill() will actually always give you back getUsed - from the rest of the code 
in the method, this does not look like a valid assumption to make.

This can resulting in spilling a large number of smaller blocks, and 
potentially itself.

For example: required = 500MB, consumers = 1.5GB 1GB 500MB 2MB 1MB ..
If spilling 500MB resulted in (say) releasing 490MB, we might end up 
spilling a large number of blocks and also (potentially) end up spilling itself 
- also can end up returning less than requested while enough memory does exist 
to satisfy the request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96574267
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

Actually the newest update is already satisfying the example you show.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96572902
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --


Use ceiling and not floor.
Ensure that the requirements are satisfied : what I wrote was on the fly to 
convey the idea and not meant to be used literally - and apparently there were 
some errors.
I have edited the examples so that there is no further confusion.

Basic idea is simple : instead of picking largest or random consumer, pick 
one which is sufficient to meet the memory requirements. If none exist, then 
evict the largest and retry until done.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96565501
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

I don't think the best or better strategy is to spill many smaller 
consumers  for a bigger memory request.

Too frequently spill small consumers should cause GC pressure as reported 
in #16387. Actually this PR is inspired by that to avoid the same consumer is 
spilled again and again even it only uses small memory.

The newest update is to spill the ceiling entry (the least consumer uses 
greater than or equal to the required memory).

For the same example, consumers = 50 MB, 100MB, 200 MB, 250MB, 500 MB, 1GB, 
it behaves like:

Required: 1.4 GB
Evict: 1GB, 500MB

Required: 300 MB
Evict: 500MB

Required: 400MB
Evict: 500MB

Required: 60 MB
Evict 100 MB

Required: 200 MB
Evict 200 MB

It is nearly the same as the result you showed. As I pointed in previous 
comment, the logic you showed actually can't produce that result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96560544
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

I got your idea. But few examples look wrong.

> Required: 400MB
Evict: 500MB

The `floorEntry` should be 250MB. So it would evict 250MB, 200MB, 100MB.

> Required: 60 MB
Evict 100 MB

The `floorEntry` should be 50MB. So it would evict 50MB, 1GB.

But with some modification to the logic, I think we can fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96360466
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

Regarding proposal two to change to TreeSet (or TreeMap as elaborated 
above) instead of HashSet.
If the ordering is dependent on something which changes - then ordering is 
not gauranteed : which is why, if you observe, I mentioned that each change in 
memory acquisition or release should remove and re-add the consumer to the set 
so that the invariant is re-established.
This might have other impacts (difficulty to ensure this is consistently 
enforced, performance, etc) - which should be looked into before proceeding 
down that path.

The benefit is the elimination of sorting (in current PR) or creation of 
TreeMap (I elaborated) each time we need to spill.

Depending on how (un)common and expensive the latter is, we should take a 
call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96359966
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --


To elaborate so that there is confusion.
A) If we are keeping existing implementation:

TreeMap where comparator is enforces natural ordering we need (trivial to 
break ties when memory in two consumers is the same - example, make it a 
composite key where relevant portion is the memory and ignored portion is the 
identityHashCode: with latter used to break ties in ordering and equality - or 
something better).

* Instead of insertAll to list and sorting - insert all to TreeSet - 
complexity and code remains same.

* Instead of iterating over sorted list trying to find minimum subset to 
remove (current implementation in the PR is not optimal, and marginally better 
than what exists) - leverage TreeMap's ability to find appropriate Consumer if 
one exists which optimally satisfies - else use largest.

Something like (illustrative only) :
```
while (required > 0) {
  val requiredKey = MyKey(required, 0)
  var entry = map.floorEntry(requiredKey)
  val consumer = if (null == entry) map.lastEntry.getValue else 
entry.getValue
  // evict consumer
  ...
  required -= consumer.getUsed
}

```

Note - I dont want to make this into a dp problem to do perfect binning - a 
good enough fast solution is sufficient.

--
Some example:

consumers = 50 MB, 100MB, 200 MB, 250MB, 500 MB, 1GB

Required: 1.4 GB
Evict: 1GB, 500MB

Required:  300 MB
Evict: 250 MB, 100MB

Required: 400MB
Evict: 500MB

Required: 60 MB
Evict 100 MB

Required: 200 MB
Evict 200 MB

--

There are ofcourse better solutions for the examples above, a first cut 
implementation which does the above should be sufficiently good enough compared 
to what currently exists.
If you want to explore a more optimum solution, that is fine too - as long 
as time and space complexity are bounded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@sp

[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96337114
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

As the memory usage of memory consumer is changing over time, not sure if 
we use TreeSet/TreeMap for consumers, can we get the correctly sorted order 
from the TreeSet/Map? In other words, the sorted order of TreeSet/Map is still 
guaranteed if the elements are mutable and changing after insertion? I think it 
is not.

If we are going to do sorting here anyway, a TreeMap/TreeSet might be 
overkill than a list like that. Another concern is that the API of 
TreeMap/TreeSet can let us find the tail set or ceiling element, but it 
requires we give it an input element to compare. But we only have the required 
memory number, not a memory consumer to compare.

Another concern is that TreeSet/TreeMap could return an empty set if all 
elements have less memory than required size. In this case, we need to go back 
to iterate all elements in the set/map to spill. It seems add more complexity.

Totally agreed that it is better to fetch the required size instead of 
going from largest to smallest always. With the current list based approach, we 
still can achieve that.





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-16 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96296239
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -53,6 +57,28 @@
  */
 public class TaskMemoryManager {
 
+  /**
+   * A internal class used to sort MemoryConsumer based on their memory 
usage.
+   * Note: This sorts consumers by descending order, i.e., the consumers 
using more memory
+   *   are sorted ahead of the consumers using less.
+   */
+  private static final class ConsumerComparator implements 
Comparator {
+@Override
+public int compare(MemoryConsumer consumer1, MemoryConsumer consumer2) 
{
+  // We can only compare the consumers which use the same mode.
+  assert (consumer1.getMode() == consumer2.getMode()) :
+"Try to compare two MemoryConsumers which are in different memory 
mode.";
+  if (consumer1.getUsed() < consumer2.getUsed()) {
+return 1;
+  } else if (consumer1.getUsed() > consumer2.getUsed()) {
+return -1;
+  } else {
+return 0;
+  }
--- End diff --

Long.compare(consumer2.getUsed, consumer1.getUsed) instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-16 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96296478
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
--- End diff --

initialSize to consumers.size()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-16 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r96298011
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+List sortedList = new ArrayList<>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+sortedList.add(c);
+  }
+}
+Collections.sort(sortedList, new ConsumerComparator());
+for (MemoryConsumer c: sortedList) {
+  try {
+long released = c.spill(required - got, consumer);
+if (released > 0) {
+  logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
+Utils.bytesToString(released), c, consumer);
+  got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
+  if (got >= required) {
+break;
   }
-} catch (IOException e) {
-  logger.error("error while calling spill() on " + c, e);
-  throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
-+ e.getMessage());
 }
+  } catch (IOException e) {
+logger.error("error while calling spill() on " + c, e);
+throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
+  + e.getMessage());
   }
--- End diff --

Instead of this, why not leverage a TreeMap or a TreeSet and fetch the 
required size directly instead of going from largest to smallest always ? (If 
you need 100MB, you have 1GB, 500MB, 150MB, 50MB - you will always spill 1GB: 
which is not required).

If using TreeMap, take a look at `TreeMap.lastEntry` for former and 
`TreeMap.map.ceilingEntry` for latter (or floorEntry, depending on your 
comparator) - similar exists for TreeSet.


A more invasive change could be to replace consumers to a TreeSet from 
HashSet : though it becomes tricky since any change in its memory footprint 
will require a remove + add to ensure TreeSet invariants are met; which might 
make the code fragile. But it will make spill more efficient, since we dont 
need to keep doing n log n operation for each spill when insufficient memory 
exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-16 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/16603

[SPARK-19244][Core] Sort MemoryConsumers according to their memory usage 
when spilling

## What changes were proposed in this pull request?

In `TaskMemoryManager `, when we acquire memory by calling 
`acquireExecutionMemory` and we can't acquire required memory, we will try to 
spill other memory consumers.

Currently, we simply iterates the memory consumers in a hash set. Normally 
each time the consumer will be iterated in the same order.

The first issue is that we might spill additional consumers. For example, 
if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB 
but we can only get 60MB and spilling is needed. We might spill both consumer 1 
and consumer 2. But we actually just need to spill consumer 2 and get the 
required 100MB.

The second issue is that if we spill consumer 1 in first time spilling. 
After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory 
and spilling is needed again. Because we iterate the memory consumers in the 
same order, we will spill consumer 1 again. So for consumer 1, we will produce 
many small spilling files.

This patch modifies the way iterating the memory consumers. It sorts the 
memory consumers by their memory usage. So the consumer using more memory will 
spill first. Once it is spilled, even it acquires few memory again, in next 
time spilling happens it will not be the consumers to spill again if there are 
other consumers using more memory than it.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 sort-memoryconsumer-when-spill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16603.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16603


commit 4c2b7b02e809614993d25b21aee3e1d55355e482
Author: Liang-Chi Hsieh 
Date:   2017-01-16T08:57:57Z

Sort MemoryConsumers according to their memory usage when spilling.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org