Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1679#discussion_r15726421
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
---
    @@ -20,122 +20,263 @@ package org.apache.spark.storage
     import scala.collection.Map
     import scala.collection.mutable
     
    -import org.apache.spark.SparkContext
    +import org.apache.spark.SparkException
     import org.apache.spark.annotation.DeveloperApi
     
     /**
      * :: DeveloperApi ::
      * Storage information for each BlockManager.
    + *
    + * This class assumes BlockId and BlockStatus are immutable, such that the 
consumers of this
    + * class cannot mutate the source of the information. Accesses are not 
thread-safe.
      */
     @DeveloperApi
    -class StorageStatus(
    -    val blockManagerId: BlockManagerId,
    -    val maxMem: Long,
    -    val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
    +class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     
    -  def memUsed = blocks.values.map(_.memSize).reduceOption(_ + 
_).getOrElse(0L)
    +  /**
    +   * Internal representation of the blocks stored in this block manager.
    +   *
    +   * A common consumption pattern is to access only the blocks that belong 
to a specific RDD.
    +   * For this use case, we should avoid linearly scanning through all the 
blocks, which could
    +   * be expensive if there are thousands of blocks on each block manager. 
Thus, we need to store
    +   * RDD blocks and non-RDD blocks separately. In particular, we store RDD 
blocks in a map
    +   * indexed by RDD IDs, so we can filter out the blocks of interest 
quickly.
     
    -  def memUsedByRDD(rddId: Int) =
    -    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
    +   * These collections should only be mutated through the 
add/update/removeBlock methods.
    +   */
    +  private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, 
BlockStatus]]
    +  private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
     
    -  def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + 
_).getOrElse(0L)
    +  /**
    +   * A map of storage information associated with each RDD.
    +   *
    +   * The key is the ID of the RDD, and the value is a 4-tuple of the 
following:
    +   *   (size in memory, size on disk, size in tachyon, storage level)
    +   *
    +   * This is updated incrementally on each block added, updated or 
removed, so as to avoid
    +   * linearly scanning through all the blocks within an RDD if we're only 
interested in a
    +   * given RDD's storage information.
    +   */
    +  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, 
Long, StorageLevel)]
     
    -  def diskUsedByRDD(rddId: Int) =
    -    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
    +  /**
    +   * Instantiate a StorageStatus with the given initial blocks. This 
essentially makes a copy of
    +   * the original blocks map such that the fate of this storage status is 
not tied to the source.
    +   */
    +  def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, 
BlockStatus]) {
    +    this(bmid, maxMem)
    +    initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
    +  }
     
    -  def memRemaining: Long = maxMem - memUsed
    +  /**
    +   * Return the blocks stored in this block manager.
    +   *
    +   * Note that this is somewhat expensive, as it involves cloning the 
underlying maps and then
    +   * concatenating them together. Much faster alternatives exist for 
common operations such as
    +   * contains, get, and size.
    +   */
    +  def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks
     
    -  def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, 
status) }
    -}
    +  /**
    +   * Return the RDD blocks stored in this block manager.
    +   *
    +   * Note that this is somewhat expensive, as it involves cloning the 
underlying maps and then
    +   * concatenating them together. Much faster alternatives exist for 
common operations such as
    +   * getting the memory, disk, and off-heap memory sizes occupied by this 
RDD.
    +   */
    +  def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, 
blocks) => blocks }
     
    -/** Helper methods for storage-related objects. */
    -private[spark] object StorageUtils {
    +  /** Return the blocks that belong to the given RDD stored in this block 
manager. */
    +  def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
    +    _rddBlocks.get(rddId).getOrElse(Map.empty)
    +  }
    +
    +  /** Add the given block to this storage status. If it already exists, 
overwrite it. */
    +  def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        // Update the storage info of the RDD, keeping track of any 
existing status for this block
    +        val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
    +        val changeInMem = blockStatus.memSize - oldBlockStatus.memSize
    +        val changeInDisk = blockStatus.diskSize - oldBlockStatus.diskSize
    +        val changeInTachyon = blockStatus.tachyonSize - 
oldBlockStatus.tachyonSize
    +        val level = blockStatus.storageLevel
    +        updateRddStorageInfo(rddId, changeInMem, changeInDisk, 
changeInTachyon, level)
    +        // Actually add the block itself
    +        _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = 
blockStatus
    +      case _ =>
    +        _nonRddBlocks(blockId) = blockStatus
    +    }
    +  }
    +
    +  /** Update the given block in this storage status. If it doesn't already 
exist, add it. */
    +  def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
    +    addBlock(blockId, blockStatus)
    +  }
    +
    +  /** Remove the given block from this storage status. */
    +  def removeBlock(blockId: BlockId): Option[BlockStatus] = {
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        // Update the storage info of the RDD if the block to remove exists
    +        getBlock(blockId).foreach { s =>
    +          updateRddStorageInfo(rddId, -s.memSize, -s.diskSize, 
-s.tachyonSize, StorageLevel.NONE)
    +        }
    +        // Actually remove the block, if it exists
    +        if (_rddBlocks.contains(rddId)) {
    --- End diff --
    
    Not really, just that it doesn't throw some key exception if we misuse this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to