Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9344#discussion_r44171102
  
    --- Diff: 
core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
    @@ -41,98 +40,106 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
      * The implication is that attempts to cache blocks may fail if execution 
has already eaten
      * up most of the storage space, in which case the new blocks will be 
evicted immediately
      * according to their respective storage levels.
    + *
    + * @param storageRegionSize Size of the storage region, in bytes.
    + *                          This region is not statically reserved; 
execution can borrow from
    + *                          it if necessary. Cached blocks can be evicted 
only if actual
    + *                          storage memory usage exceeds this region.
      */
    -private[spark] class UnifiedMemoryManager(
    +private[spark] class UnifiedMemoryManager private[memory] (
         conf: SparkConf,
         maxMemory: Long,
    +    private val storageRegionSize: Long,
         numCores: Int)
    -  extends MemoryManager(conf, numCores) {
    -
    -  def this(conf: SparkConf, numCores: Int) {
    -    this(conf, UnifiedMemoryManager.getMaxMemory(conf), numCores)
    -  }
    -
    -  /**
    -   * Size of the storage region, in bytes.
    -   *
    -   * This region is not statically reserved; execution can borrow from it 
if necessary.
    -   * Cached blocks can be evicted only if actual storage memory usage 
exceeds this region.
    -   */
    -  private val storageRegionSize: Long = {
    -    (maxMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong
    -  }
    -
    -  /**
    -   * Total amount of memory, in bytes, not currently occupied by either 
execution or storage.
    -   */
    -  private def totalFreeMemory: Long = synchronized {
    -    assert(_executionMemoryUsed <= maxMemory)
    -    assert(_storageMemoryUsed <= maxMemory)
    -    assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
    -    maxMemory - _executionMemoryUsed - _storageMemoryUsed
    -  }
    +  extends MemoryManager(
    +    conf,
    +    numCores,
    +    // TODO(josh): it is confusing how this interacts with page size 
calculations:
    +    maxOnHeapExecutionMemory = maxMemory - storageRegionSize) {
     
    -  /**
    -   * Total available memory for execution, in bytes.
    -   * In this model, this is equivalent to the amount of memory not 
occupied by storage.
    -   */
    -  override def maxExecutionMemory: Long = synchronized {
    -    maxMemory - _storageMemoryUsed
    -  }
    +  // We always maintain the invariant that
    +  //    onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == 
maxMemory
    +  // At first, all memory is allocated towards execution:
    +  onHeapExecutionMemoryPool.incrementPoolSize(maxMemory)
    --- End diff --
    
    Since we can always shrink the size of storage memory, should we have the 
same initial sizes as StaticMemoryManager (put these in MemoryManager),  that 
will be easy to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to