Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9241#discussion_r42953325
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ---
@@ -90,33 +101,55 @@ class ShuffleMemoryManager protected (
// How much we can grant this task; don't let it grow to more than 1
/ numActiveTasks;
// don't let it be negative
- val maxToGrant = math.min(numBytes, math.max(0, (maxMemory /
numActiveTasks) - curMem))
- // Only give it as much memory as is free, which might be none if it
reached 1 / numTasks
- val toGrant = math.min(maxToGrant, freeMemory)
-
- if (curMem < maxMemory / (2 * numActiveTasks)) {
- // We want to let each task get at least 1 / (2 * numActiveTasks)
before blocking;
- // if we can't give it this much now, wait for other tasks to free
up memory
- // (this happens if older tasks allocated lots of memory before N
grew)
- if (freeMemory >= math.min(maxToGrant, maxMemory / (2 *
numActiveTasks) - curMem)) {
- return acquire(toGrant)
+ // Only give it as much memory as is 1/4 of free, which might be none
+ // if it reached 1 / numTasks
+ val available = math.min(freeMemory, math.max(0, (maxMemory /
numActiveTasks) - curMem))
+ if (available >= numBytes) {
+ return acquire(consumer, numBytes)
+ }
+ val needed = numBytes - available
+ if (consumer != null && consumer.release(needed) >= needed) {
+ return acquire(consumer, numBytes)
+ }
+ if (!released && taskConsumers.contains(taskAttemptId)) {
+ // try to release more to make sure that we will got enough memory
in next loop
+ tryRelease(numBytes - available)
+ released = true
+ } else {
+ if (curMem < maxMemory / (2 * numActiveTasks)) {
+ return acquire(consumer, available)
} else {
- logInfo(
- s"TID $taskAttemptId waiting for at least 1/2N of shuffle
memory pool to be free")
+ // in case of memory is not balanced, try to protected the ones
already have more memory
+ // wait for other tasks to finished or fail (release memory)
memoryManager.wait()
}
- } else {
- return acquire(toGrant)
}
}
0L // Never reached
}
+ private def tryRelease(numBytes: Long): Long = {
+ val taskAttemptId = currentTaskAttemptId()
+ var released = 0L
+ taskConsumers(taskAttemptId).foreach { case (consumer, used) =>
+ if (consumer != null && used > 0) {
+ released += consumer.release(numBytes - released)
+ if (released >= numBytes) {
+ return released
+ }
+ }
+ }
+ released
+ }
+
/**
* Acquire N bytes of execution memory from the memory manager for the
current task.
* @return number of bytes actually acquired (<= N).
*/
- private def acquire(numBytes: Long): Long = memoryManager.synchronized {
+ private def acquire(consumer: MemoryConsumer, numBytes: Long): Long =
memoryManager.synchronized {
+ if (numBytes <= 0) {
--- End diff --
Should we add an assert and guard against requesting a negative amount of
memory, since that probably indicates a bug elsewhere?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]