Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9127#discussion_r42805528
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
    @@ -155,4 +164,217 @@ private[spark] abstract class MemoryManager extends 
Logging {
         _storageMemoryUsed
       }
     
    +  // -- The code formerly known as ShuffleMemoryManager 
--------------------------------------------
    +
    +  /*
    +   * Allocates a pool of memory to tasks for use in shuffle operations. 
Each disk-spilling
    +   * collection (ExternalAppendOnlyMap or ExternalSorter) used by these 
tasks can acquire memory
    +   * from this pool and release it as it spills data out. When a task 
ends, all its memory will be
    +   * released by the Executor.
    +   *
    +   * This class tries to ensure that each task gets a reasonable share of 
memory, instead of some
    +   * task ramping up to a large amount first and then causing others to 
spill to disk repeatedly.
    +   * If there are N tasks, it ensures that each tasks can acquire at least 
1 / 2N of the memory
    +   * before it has to spill, and at most 1 / N. Because N varies 
dynamically, we keep track of the
    +   * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever
    +   * this set changes. This is all done by synchronizing access to 
`memoryManager` to mutate state
    +   * and using wait() and notifyAll() to signal changes.
    +   */
    +
    +  /**
    +   * Sets the page size, in bytes.
    +   *
    +   * If user didn't explicitly set "spark.buffer.pageSize", we figure out 
the default value
    +   * by looking at the number of cores available to the process, and the 
total amount of memory,
    +   * and then divide it by a factor of safety.
    +   */
    +  val pageSizeBytes: Long = {
    +    val minPageSize = 1L * 1024 * 1024   // 1MB
    +    val maxPageSize = 64L * minPageSize  // 64MB
    +    val cores = if (numCores > 0) numCores else 
Runtime.getRuntime.availableProcessors()
    +    // Because of rounding to next power of 2, we may have safetyFactor as 
8 in worst case
    +    val safetyFactor = 16
    +    val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / 
safetyFactor)
    +    val default = math.min(maxPageSize, math.max(minPageSize, size))
    +    conf.getSizeAsBytes("spark.buffer.pageSize", default)
    +  }
    +
    +
    +  private val taskMemory = new mutable.HashMap[Long, Long]()  // 
taskAttemptId -> memory bytes
    +
    +  /**
    +   * Try to acquire up to numBytes memory for the current task, and return 
the number of bytes
    +   * obtained, or 0 if none can be allocated. This call may block until 
there is enough free memory
    +   * in some situations, to make sure each task has a chance to ramp up to 
at least 1 / 2N of the
    +   * total memory pool (where N is the # of active tasks) before it is 
forced to spill. This can
    +   * happen if the number of tasks increases but an older task had a lot 
of memory already.
    +   */
    +  def tryToAcquire(numBytes: Long, taskAttemptId: Long): Long = 
synchronized {
    --- End diff --
    
    it's unclear what kind of memory this is trying to acquire. I think we 
should rename this something like `acquireExecutionMemoryForThread`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to