Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9127#discussion_r42805822
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
    @@ -155,4 +164,217 @@ private[spark] abstract class MemoryManager extends 
Logging {
         _storageMemoryUsed
       }
     
    +  // -- The code formerly known as ShuffleMemoryManager 
--------------------------------------------
    +
    +  /*
    +   * Allocates a pool of memory to tasks for use in shuffle operations. 
Each disk-spilling
    +   * collection (ExternalAppendOnlyMap or ExternalSorter) used by these 
tasks can acquire memory
    +   * from this pool and release it as it spills data out. When a task 
ends, all its memory will be
    +   * released by the Executor.
    +   *
    +   * This class tries to ensure that each task gets a reasonable share of 
memory, instead of some
    +   * task ramping up to a large amount first and then causing others to 
spill to disk repeatedly.
    +   * If there are N tasks, it ensures that each tasks can acquire at least 
1 / 2N of the memory
    +   * before it has to spill, and at most 1 / N. Because N varies 
dynamically, we keep track of the
    +   * set of active tasks and redo the calculations of 1 / 2N and 1 / N in 
waiting tasks whenever
    +   * this set changes. This is all done by synchronizing access to 
`memoryManager` to mutate state
    +   * and using wait() and notifyAll() to signal changes.
    +   */
    +
    +  /**
    +   * Sets the page size, in bytes.
    +   *
    +   * If user didn't explicitly set "spark.buffer.pageSize", we figure out 
the default value
    +   * by looking at the number of cores available to the process, and the 
total amount of memory,
    +   * and then divide it by a factor of safety.
    +   */
    +  val pageSizeBytes: Long = {
    +    val minPageSize = 1L * 1024 * 1024   // 1MB
    +    val maxPageSize = 64L * minPageSize  // 64MB
    +    val cores = if (numCores > 0) numCores else 
Runtime.getRuntime.availableProcessors()
    +    // Because of rounding to next power of 2, we may have safetyFactor as 
8 in worst case
    +    val safetyFactor = 16
    +    val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / 
safetyFactor)
    +    val default = math.min(maxPageSize, math.max(minPageSize, size))
    +    conf.getSizeAsBytes("spark.buffer.pageSize", default)
    +  }
    +
    +
    +  private val taskMemory = new mutable.HashMap[Long, Long]()  // 
taskAttemptId -> memory bytes
    +
    +  /**
    +   * Try to acquire up to numBytes memory for the current task, and return 
the number of bytes
    +   * obtained, or 0 if none can be allocated. This call may block until 
there is enough free memory
    +   * in some situations, to make sure each task has a chance to ramp up to 
at least 1 / 2N of the
    +   * total memory pool (where N is the # of active tasks) before it is 
forced to spill. This can
    +   * happen if the number of tasks increases but an older task had a lot 
of memory already.
    +   */
    +  def tryToAcquire(numBytes: Long, taskAttemptId: Long): Long = 
synchronized {
    +    assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
    +
    +    // Add this task to the taskMemory map just so we can keep an accurate 
count of the number
    +    // of active tasks, to let other tasks ramp down their memory in calls 
to tryToAcquire
    +    if (!taskMemory.contains(taskAttemptId)) {
    +      taskMemory(taskAttemptId) = 0L
    +      // This will later cause waiting tasks to wake up and check numTasks 
again
    +      notifyAll()
    +    }
    +
    +    // Keep looping until we're either sure that we don't want to grant 
this request (because this
    +    // task would have more than 1 / numActiveTasks of the memory) or we 
have enough free
    +    // memory to give it (we always let each task get at least 1 / (2 * 
numActiveTasks)).
    +    // TODO: simplify this to limit each task to its own slot
    +    while (true) {
    +      val numActiveTasks = taskMemory.keys.size
    +      val curMem = taskMemory(taskAttemptId)
    +      val freeMemory = maxExecutionMemory - taskMemory.values.sum
    +
    +      // How much we can grant this task; don't let it grow to more than 1 
/ numActiveTasks;
    +      // don't let it be negative
    +      val maxToGrant =
    +        math.min(numBytes, math.max(0, (maxExecutionMemory / 
numActiveTasks) - curMem))
    +      // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
    +      val toGrant = math.min(maxToGrant, freeMemory)
    +
    +      if (curMem < maxExecutionMemory / (2 * numActiveTasks)) {
    +        // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
    +        // if we can't give it this much now, wait for other tasks to free 
up memory
    +        // (this happens if older tasks allocated lots of memory before N 
grew)
    +        if (
    +          freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * 
numActiveTasks) - curMem)) {
    +          return acquire(toGrant, taskAttemptId)
    +        } else {
    +          logInfo(
    +            s"TID $taskAttemptId waiting for at least 1/2N of shuffle 
memory pool to be free")
    +          wait()
    +        }
    +      } else {
    +        return acquire(toGrant, taskAttemptId)
    +      }
    +    }
    +    0L  // Never reached
    +  }
    +
    +  /**
    +   * Acquire N bytes of execution memory from the memory manager for the 
current task.
    +   * @return number of bytes actually acquired (<= N).
    +   */
    +  private def acquire(numBytes: Long, taskAttemptId: Long): Long = 
synchronized {
    +    val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    +    val acquired = acquireExecutionMemory(numBytes, evictedBlocks)
    +    // Register evicted blocks, if any, with the active task metrics
    +    // TODO: just do this in `acquireExecutionMemory` (SPARK-10985)
    +    Option(TaskContext.get()).foreach { tc =>
    +      val metrics = tc.taskMetrics()
    +      val lastUpdatedBlocks = 
metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    +      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ 
evictedBlocks.toSeq)
    +    }
    +    taskMemory(taskAttemptId) += acquired
    +    acquired
    +  }
    +
    +  /** Release numBytes bytes for the current task. */
    +  def release(numBytes: Long, taskAttemptId: Long): Unit = synchronized {
    +    val curMem = taskMemory.getOrElse(taskAttemptId, 0L)
    +    if (curMem < numBytes) {
    +      throw new SparkException(
    +        s"Internal error: release called on $numBytes bytes but task only 
has $curMem")
    +    }
    +    if (taskMemory.contains(taskAttemptId)) {
    +      taskMemory(taskAttemptId) -= numBytes
    +      releaseExecutionMemory(numBytes)
    +    }
    +    notifyAll() // Notify waiters in tryToAcquire that memory has been 
freed
    +  }
    +
    +  /** Release all memory for the current task and mark it as inactive 
(e.g. when a task ends). */
    +  private[memory] def releaseMemoryForTask(taskAttemptId: Long): Unit = 
synchronized {
    +    taskMemory.remove(taskAttemptId).foreach { numBytes =>
    +      releaseExecutionMemory(numBytes)
    +    }
    +    notifyAll() // Notify waiters in tryToAcquire that memory has been 
freed
    +  }
    +
    +  /** Returns the memory consumption, in bytes, for the current task */
    +  private[memory] def getMemoryConsumptionForTask(taskAttemptId: Long): 
Long = synchronized {
    +    taskMemory.getOrElse(taskAttemptId, 0L)
    +  }
    +
    +  // -- Methods related to Tungsten managed memory 
-------------------------------------------------
    +
    +  /**
    +   * Tracks whether Tungsten memory will be allocated on the JVM heap or 
off-heap using
    +   * sun.misc.Unsafe.
    +   */
    +  final val tungstenMemoryIsAllocatedInHeap: Boolean =
    +    !conf.getBoolean("spark.unsafe.offHeap", false)
    +
    +  /**
    +   * Allocates memory for use by Unsafe/Tungsten code. Exposed to enable 
untracked allocations of
    +   * temporary data structures.
    +   */
    +  final val tungstenMemoryAllocator: MemoryAllocator =
    +    if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else 
MemoryAllocator.UNSAFE
    +
    +  private val POOLING_THRESHOLD_BYTES: Int = 1024 * 1024
    +
    +  /**
    +   * Returns true if allocations of the given size should go through the 
pooling mechanism and
    +   * false otherwise.
    +   */
    +  private def shouldPool(size: Long): Boolean = {
    +    // Very small allocations are less likely to benefit from pooling.
    +    // At some point, we should explore supporting pooling for off-heap 
memory, but for now we'll
    +    // ignore that case in the interest of simplicity.
    +    size >= POOLING_THRESHOLD_BYTES && tungstenMemoryIsAllocatedInHeap
    +  }
    +
    +  @GuardedBy("this")
    +  private val bufferPoolsBySize: util.Map[Long, 
util.LinkedList[WeakReference[MemoryBlock]]] =
    +    new util.HashMap[Long, util.LinkedList[WeakReference[MemoryBlock]]]
    +
    +  /**
    +   * Allocates a contiguous block of memory. Note that the allocated 
memory is not guaranteed
    +   * to be zeroed out (call `zero()` on the result if this is necessary).
    +   */
    +  @throws(classOf[OutOfMemoryError])
    +  final def allocateMemoryBlock(size: Long): MemoryBlock = {
    +    // TODO(josh): Integrate with execution memory management
    +    if (shouldPool(size)) {
    +      this synchronized {
    +        val pool: util.LinkedList[WeakReference[MemoryBlock]] = 
bufferPoolsBySize.get(size)
    +        if (pool != null) {
    +          while (!pool.isEmpty) {
    +            val blockReference: WeakReference[MemoryBlock] = pool.pop
    +            val memory: MemoryBlock = blockReference.get
    +            if (memory != null) {
    +              assert(memory.size == size)
    +              return memory
    +            }
    +          }
    +          bufferPoolsBySize.remove(size)
    +        }
    +      }
    +      tungstenMemoryAllocator.allocate(size)
    +    } else {
    +      tungstenMemoryAllocator.allocate(size)
    +    }
    +  }
    +
    +  final def freeMemoryBlock(memory: MemoryBlock) {
    --- End diff --
    
    add a java doc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to