Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10240#discussion_r47275498
  
    --- Diff: 
core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala ---
    @@ -91,23 +108,34 @@ private[memory] class ExecutionMemoryPool(
           val numActiveTasks = memoryForTask.keys.size
           val curMem = memoryForTask(taskAttemptId)
     
    -      // How much we can grant this task; don't let it grow to more than 1 
/ numActiveTasks;
    -      // don't let it be negative
    -      val maxToGrant =
    -        math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - 
curMem))
    +      // In every iteration of this loop, we should first try to reclaim 
any borrowed execution
    +      // space from storage. This is necessary because of the potential 
race condition where new
    +      // storage blocks may steal the free execution memory that this task 
was waiting for.
    +      maybeGrowPool(numBytes - memoryFree)
    +
    +      // Maximum size the pool would have after potentially growing the 
pool.
    +      // This is used to compute the upper bound of how much memory each 
task can occupy. This
    +      // must take into account potential free memory as well as the 
amount this pool currently
    +      // occupies. Otherwise, we may run into SPARK-12155 where, in 
unified memory management,
    +      // we did not take into account space that could have been freed by 
evicting cached blocks.
    +      val maxPoolSize = computeMaxPoolSize()
    +      val maxMemoryPerTask = maxPoolSize / numActiveTasks
    +      val minMemoryPerTask = poolSize / (2 * numActiveTasks)
    +
    +      // How much we can grant this task; keep its share within 0 <= X <= 
1 / numActiveTasks
    +      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - 
curMem))
           // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
           val toGrant = math.min(maxToGrant, memoryFree)
     
    -      if (curMem < poolSize / (2 * numActiveTasks)) {
    +      if (curMem < minMemoryPerTask) {
    --- End diff --
    
    I was able to prove this myself. I summarized my thoughts in this gist: 
https://gist.github.com/andrewor14/aea58796dd25d2ec9f20
    
    That said, I would still prefer to do this separately since this PR is 
already passing tests. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to