Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1679#discussion_r15726507
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
---
    @@ -20,122 +20,258 @@ package org.apache.spark.storage
     import scala.collection.Map
     import scala.collection.mutable
     
    -import org.apache.spark.SparkContext
     import org.apache.spark.annotation.DeveloperApi
     
     /**
      * :: DeveloperApi ::
      * Storage information for each BlockManager.
    + *
    + * This class assumes BlockId and BlockStatus are immutable, such that the 
consumers of this
    + * class cannot mutate the source of the information. Accesses are not 
thread-safe.
      */
     @DeveloperApi
    -class StorageStatus(
    -    val blockManagerId: BlockManagerId,
    -    val maxMem: Long,
    -    val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
    +class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     
    -  def memUsed = blocks.values.map(_.memSize).reduceOption(_ + 
_).getOrElse(0L)
    +  /**
    +   * Internal representation of the blocks stored in this block manager.
    +   *
    +   * We store RDD blocks and non-RDD blocks separately to allow quick 
retrievals of RDD blocks.
    +   * These collections should only be mutated through the 
add/update/removeBlock methods.
    +   */
    +  private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, 
BlockStatus]]
    +  private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
     
    -  def memUsedByRDD(rddId: Int) =
    -    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
    +  /**
    +   * Storage information of the blocks that entails memory, disk, and 
off-heap memory usage.
    +   *
    +   * As with the block maps, we store the storage information separately 
for RDD blocks and
    +   * non-RDD blocks for the same reason. In particular, RDD storage 
information is stored
    +   * in a map indexed by the RDD ID to the following 4-tuple:
    +   *
    +   *   (memory size, disk size, off-heap size, storage level)
    +   *
    +   * We assume that all the blocks that belong to the same RDD have the 
same storage level.
    +   * This field is not relevant to non-RDD blocks, however, so the storage 
information for
    +   * non-RDD blocks contains only the first 3 fields (in the same order).
    +   */
    +  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, 
Long, StorageLevel)]
    +  private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
     
    -  def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + 
_).getOrElse(0L)
    +  /** Create a storage status with an initial set of blocks, leaving the 
source unmodified. */
    +  def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, 
BlockStatus]) {
    +    this(bmid, maxMem)
    +    initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
    +  }
     
    -  def diskUsedByRDD(rddId: Int) =
    -    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
    +  /**
    +   * Return the blocks stored in this block manager.
    +   *
    +   * Note that this is somewhat expensive, as it involves cloning the 
underlying maps and then
    +   * concatenating them together. Much faster alternatives exist for 
common operations such as
    +   * contains, get, and size.
    +   */
    +  def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks
     
    -  def memRemaining: Long = maxMem - memUsed
    +  /**
    +   * Return the RDD blocks stored in this block manager.
    +   *
    +   * Note that this is somewhat expensive, as it involves cloning the 
underlying maps and then
    +   * concatenating them together. Much faster alternatives exist for 
common operations such as
    +   * getting the memory, disk, and off-heap memory sizes occupied by this 
RDD.
    +   */
    +  def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, 
blocks) => blocks }
     
    -  def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, 
status) }
    -}
    +  /** Return the blocks that belong to the given RDD stored in this block 
manager. */
    +  def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
    +    _rddBlocks.get(rddId).getOrElse(Map.empty)
    +  }
     
    -/** Helper methods for storage-related objects. */
    -private[spark] object StorageUtils {
    +  /** Add the given block to this storage status. If it already exists, 
overwrite it. */
    +  private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): 
Unit = {
    +    updateStorageInfo(blockId, blockStatus)
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = 
blockStatus
    +      case _ =>
    +        _nonRddBlocks(blockId) = blockStatus
    +    }
    +  }
    +
    +  /** Update the given block in this storage status. If it doesn't already 
exist, add it. */
    +  private[spark] def updateBlock(blockId: BlockId, blockStatus: 
BlockStatus): Unit = {
    +    addBlock(blockId, blockStatus)
    +  }
    +
    +  /** Remove the given block from this storage status. */
    +  private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = {
    +    updateStorageInfo(blockId, BlockStatus.empty)
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        // Actually remove the block, if it exists
    +        if (_rddBlocks.contains(rddId)) {
    +          val removed = _rddBlocks(rddId).remove(blockId)
    +          // If the given RDD has no more blocks left, remove the RDD
    +          if (_rddBlocks(rddId).isEmpty) {
    +            _rddBlocks.remove(rddId)
    +          }
    +          removed
    +        } else {
    +          None
    +        }
    +      case _ =>
    +        _nonRddBlocks.remove(blockId)
    +    }
    +  }
     
       /**
    -   * Returns basic information of all RDDs persisted in the given 
SparkContext. This does not
    -   * include storage information.
    +   * Return whether the given block is stored in this block manager in 
O(1) time.
    +   * Note that this is much faster than `this.blocks.contains`, which is 
O(blocks) time.
        */
    -  def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
    -    sc.persistentRdds.values.map { rdd =>
    -      val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
    -      val rddNumPartitions = rdd.partitions.size
    -      val rddStorageLevel = rdd.getStorageLevel
    -      val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, 
rddStorageLevel)
    -      rddInfo
    -    }.toArray
    +  def containsBlock(blockId: BlockId): Boolean = {
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        _rddBlocks.get(rddId).exists(_.contains(blockId))
    +      case _ =>
    +        _nonRddBlocks.contains(blockId)
    +    }
       }
     
    -  /** Returns storage information of all RDDs persisted in the given 
SparkContext. */
    -  def rddInfoFromStorageStatus(
    -      storageStatuses: Seq[StorageStatus],
    -      sc: SparkContext): Array[RDDInfo] = {
    -    rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
    +  /**
    +   * Return the given block stored in this block manager in O(1) time.
    +   * Note that this is much faster than `this.blocks.get`, which is 
O(blocks) time.
    +   */
    +  def getBlock(blockId: BlockId): Option[BlockStatus] = {
    +    blockId match {
    +      case RDDBlockId(rddId, _) =>
    +        _rddBlocks.get(rddId).map(_.get(blockId)).flatten
    +      case _ =>
    +        _nonRddBlocks.get(blockId)
    +    }
       }
     
    -  /** Returns storage information of all RDDs in the given list. */
    -  def rddInfoFromStorageStatus(
    -      storageStatuses: Seq[StorageStatus],
    -      rddInfos: Seq[RDDInfo],
    -      updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): 
Array[RDDInfo] = {
    -
    -    // Mapping from a block ID -> its status
    -    val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
    -
    -    // Record updated blocks, if any
    -    updatedBlocks
    -      .collect { case (id: RDDBlockId, status) => (id, status) }
    -      .foreach { case (id, status) => blockMap(id) = status }
    -
    -    // Mapping from RDD ID -> an array of associated BlockStatuses
    -    val rddBlockMap = blockMap
    -      .groupBy { case (k, _) => k.rddId }
    -      .mapValues(_.values.toArray)
    -
    -    // Mapping from RDD ID -> the associated RDDInfo (with potentially 
outdated storage information)
    -    val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
    -
    -    val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
    -      // Add up memory, disk and Tachyon sizes
    -      val persistedBlocks =
    -        blocks.filter { status => status.memSize + status.diskSize + 
status.tachyonSize > 0 }
    -      val _storageLevel =
    -        if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel 
else StorageLevel.NONE
    -      val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + 
_).getOrElse(0L)
    -      val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + 
_).getOrElse(0L)
    -      val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ 
+ _).getOrElse(0L)
    -      rddInfoMap.get(rddId).map { rddInfo =>
    -        rddInfo.storageLevel = _storageLevel
    -        rddInfo.numCachedPartitions = persistedBlocks.length
    -        rddInfo.memSize = memSize
    -        rddInfo.diskSize = diskSize
    -        rddInfo.tachyonSize = tachyonSize
    -        rddInfo
    -      }
    -    }.toArray
    +  /**
    +   * Return the number of blocks stored in this block manager in O(RDDs) 
time.
    +   * Note that this is much faster than `this.blocks.size`, which is 
O(blocks) time.
    +   */
    +  def numBlocks: Int = _nonRddBlocks.size + numRddBlocks
    +
    +  /**
    +   * Return the number of RDD blocks stored in this block manager in 
O(RDDs) time.
    +   * Note that this is much faster than `this.rddBlocks.size`, which is 
O(RDD blocks) time.
    +   */
    +  def numRddBlocks: Int = _rddBlocks.values.map(_.size).fold(0)(_ + _)
     
    -    scala.util.Sorting.quickSort(rddStorageInfos)
    -    rddStorageInfos
    +  /**
    +   * Return the number of blocks that belong to the given RDD in O(1) time.
    +   * Note that this is much faster than `this.rddBlocksById(rddId).size`, 
which is
    +   * O(blocks in this RDD) time.
    +   */
    +  def numRddBlocksById(rddId: Int): Int = 
_rddBlocks.get(rddId).map(_.size).getOrElse(0)
    +
    +  /** Return the memory remaining in this block manager. */
    +  def memRemaining: Long = maxMem - memUsed
    +
    +  /** Return the memory used by this block manager. */
    +  def memUsed: Long =
    +    _nonRddStorageInfo._1 + 
_rddBlocks.keys.toSeq.map(memUsedByRdd).fold(0L)(_ + _)
    --- End diff --
    
    We can replace all of these aggregations with `sum`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to