Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9241#discussion_r42953325
  
    --- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ---
    @@ -90,33 +101,55 @@ class ShuffleMemoryManager protected (
     
           // How much we can grant this task; don't let it grow to more than 1 
/ numActiveTasks;
           // don't let it be negative
    -      val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / 
numActiveTasks) - curMem))
    -      // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
    -      val toGrant = math.min(maxToGrant, freeMemory)
    -
    -      if (curMem < maxMemory / (2 * numActiveTasks)) {
    -        // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
    -        // if we can't give it this much now, wait for other tasks to free 
up memory
    -        // (this happens if older tasks allocated lots of memory before N 
grew)
    -        if (freeMemory >= math.min(maxToGrant, maxMemory / (2 * 
numActiveTasks) - curMem)) {
    -          return acquire(toGrant)
    +      // Only give it as much memory as is 1/4 of free, which might be none
    +      // if it reached 1 / numTasks
    +      val available = math.min(freeMemory, math.max(0, (maxMemory / 
numActiveTasks) - curMem))
    +      if (available >= numBytes) {
    +        return acquire(consumer, numBytes)
    +      }
    +      val needed = numBytes - available
    +      if (consumer != null && consumer.release(needed) >= needed) {
    +        return acquire(consumer, numBytes)
    +      }
    +      if (!released && taskConsumers.contains(taskAttemptId)) {
    +        // try to release more to make sure that we will got enough memory 
in next loop
    +        tryRelease(numBytes - available)
    +        released = true
    +      } else {
    +        if (curMem < maxMemory / (2 * numActiveTasks)) {
    +          return acquire(consumer, available)
             } else {
    -          logInfo(
    -            s"TID $taskAttemptId waiting for at least 1/2N of shuffle 
memory pool to be free")
    +          // in case of memory is not balanced, try to protected the ones 
already have more memory
    +          // wait for other tasks to finished or fail (release memory)
               memoryManager.wait()
             }
    -      } else {
    -        return acquire(toGrant)
           }
         }
         0L  // Never reached
       }
     
    +  private def tryRelease(numBytes: Long): Long = {
    +    val taskAttemptId = currentTaskAttemptId()
    +    var released = 0L
    +    taskConsumers(taskAttemptId).foreach { case (consumer, used) =>
    +      if (consumer != null && used > 0) {
    +        released += consumer.release(numBytes - released)
    +        if (released >= numBytes) {
    +          return released
    +        }
    +      }
    +    }
    +    released
    +  }
    +
       /**
        * Acquire N bytes of execution memory from the memory manager for the 
current task.
        * @return number of bytes actually acquired (<= N).
        */
    -  private def acquire(numBytes: Long): Long = memoryManager.synchronized {
    +  private def acquire(consumer: MemoryConsumer, numBytes: Long): Long = 
memoryManager.synchronized {
    +    if (numBytes <= 0) {
    --- End diff --
    
    Should we add an assert and guard against requesting a negative amount of 
memory, since that probably indicates a bug elsewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to