Github user zhangwei72 commented on the issue:

    https://github.com/apache/spark/pull/17549
  
    
`spark-1.6.2\core\src\main\scala\org\apache\spark\memory\UnifiedMemoryManager`
    override private[memory] def acquireExecutionMemory(
          numBytes: Long,
          taskAttemptId: Long,
          memoryMode: MemoryMode): Long = synchronized {
        assertInvariant()
        assert(numBytes >= 0)
        memoryMode match {
          case MemoryMode.ON_HEAP =>
    
            /**
             * Grow the execution pool by evicting cached blocks, thereby 
shrinking the storage pool.
             *
             * When acquiring memory for a task, the execution pool may need to 
make multiple
             * attempts. Each attempt must be able to evict storage in case 
another task jumps in
             * and caches a large block between the attempts. This is called 
once per attempt.
             */
            def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
              if (extraMemoryNeeded > 0) {
                // There is not enough free memory in the execution pool, so 
try to reclaim memory from
                // storage. We can reclaim any free memory from the storage 
pool. If the storage pool
                // has grown to become larger than `storageRegionSize`, we can 
evict blocks and reclaim
                // the memory that storage has borrowed from execution.
                val memoryReclaimableFromStorage =
                +  storageMemoryPool.poolSize - storageRegionSize
                 - math.max(storageMemoryPool.memoryFree,
    
     storageMemoryPool.poolSize - storageRegionSize)
                if (memoryReclaimableFromStorage > 0) {
                  // Only reclaim as much space as is necessary and available:
                  val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
                    math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
              +     logInfo("storageMemoryPool.memoryFree 
%f".format(storageMemoryPool.memoryFree/1024.0/1024.0))
               +    logInfo("onHeapExecutionMemoryPool.memoryFree 
%f".format(onHeapExecutionMemoryPool.memoryFree/1024.0/1024.0))
    
               +    logInfo("storageMemoryPool.memoryUsed %f".format( 
storageMemoryPool.memoryUsed/1024.0/1024.0))
              +     logInfo("onHeapExecutionMemoryPool.memoryUsed 
%f".format(onHeapExecutionMemoryPool.memoryUsed/1024.0/1024.0))
    
               +    logInfo("storageMemoryPool.poolSize %f".format( 
storageMemoryPool.poolSize/1024.0/1024.0))
              +     logInfo("onHeapExecutionMemoryPool.poolSize 
%f".format(onHeapExecutionMemoryPool.poolSize/1024.0/1024.0))
                  
                  storageMemoryPool.decrementPoolSize(spaceToReclaim)
                  onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
                }
              }
            }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to