Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/1679#discussion_r15726507
--- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
---
@@ -20,122 +20,258 @@ package org.apache.spark.storage
import scala.collection.Map
import scala.collection.mutable
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Storage information for each BlockManager.
+ *
+ * This class assumes BlockId and BlockStatus are immutable, such that the
consumers of this
+ * class cannot mutate the source of the information. Accesses are not
thread-safe.
*/
@DeveloperApi
-class StorageStatus(
- val blockManagerId: BlockManagerId,
- val maxMem: Long,
- val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
+class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
- def memUsed = blocks.values.map(_.memSize).reduceOption(_ +
_).getOrElse(0L)
+ /**
+ * Internal representation of the blocks stored in this block manager.
+ *
+ * We store RDD blocks and non-RDD blocks separately to allow quick
retrievals of RDD blocks.
+ * These collections should only be mutated through the
add/update/removeBlock methods.
+ */
+ private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId,
BlockStatus]]
+ private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- def memUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId ==
rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Storage information of the blocks that entails memory, disk, and
off-heap memory usage.
+ *
+ * As with the block maps, we store the storage information separately
for RDD blocks and
+ * non-RDD blocks for the same reason. In particular, RDD storage
information is stored
+ * in a map indexed by the RDD ID to the following 4-tuple:
+ *
+ * (memory size, disk size, off-heap size, storage level)
+ *
+ * We assume that all the blocks that belong to the same RDD have the
same storage level.
+ * This field is not relevant to non-RDD blocks, however, so the storage
information for
+ * non-RDD blocks contains only the first 3 fields (in the same order).
+ */
+ private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long,
Long, StorageLevel)]
+ private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
- def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ +
_).getOrElse(0L)
+ /** Create a storage status with an initial set of blocks, leaving the
source unmodified. */
+ def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId,
BlockStatus]) {
+ this(bmid, maxMem)
+ initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
+ }
- def diskUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId ==
rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Return the blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the
underlying maps and then
+ * concatenating them together. Much faster alternatives exist for
common operations such as
+ * contains, get, and size.
+ */
+ def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks
- def memRemaining: Long = maxMem - memUsed
+ /**
+ * Return the RDD blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the
underlying maps and then
+ * concatenating them together. Much faster alternatives exist for
common operations such as
+ * getting the memory, disk, and off-heap memory sizes occupied by this
RDD.
+ */
+ def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_,
blocks) => blocks }
- def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd,
status) }
-}
+ /** Return the blocks that belong to the given RDD stored in this block
manager. */
+ def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
+ _rddBlocks.get(rddId).getOrElse(Map.empty)
+ }
-/** Helper methods for storage-related objects. */
-private[spark] object StorageUtils {
+ /** Add the given block to this storage status. If it already exists,
overwrite it. */
+ private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus):
Unit = {
+ updateStorageInfo(blockId, blockStatus)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) =
blockStatus
+ case _ =>
+ _nonRddBlocks(blockId) = blockStatus
+ }
+ }
+
+ /** Update the given block in this storage status. If it doesn't already
exist, add it. */
+ private[spark] def updateBlock(blockId: BlockId, blockStatus:
BlockStatus): Unit = {
+ addBlock(blockId, blockStatus)
+ }
+
+ /** Remove the given block from this storage status. */
+ private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = {
+ updateStorageInfo(blockId, BlockStatus.empty)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ // Actually remove the block, if it exists
+ if (_rddBlocks.contains(rddId)) {
+ val removed = _rddBlocks(rddId).remove(blockId)
+ // If the given RDD has no more blocks left, remove the RDD
+ if (_rddBlocks(rddId).isEmpty) {
+ _rddBlocks.remove(rddId)
+ }
+ removed
+ } else {
+ None
+ }
+ case _ =>
+ _nonRddBlocks.remove(blockId)
+ }
+ }
/**
- * Returns basic information of all RDDs persisted in the given
SparkContext. This does not
- * include storage information.
+ * Return whether the given block is stored in this block manager in
O(1) time.
+ * Note that this is much faster than `this.blocks.contains`, which is
O(blocks) time.
*/
- def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
- sc.persistentRdds.values.map { rdd =>
- val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
- val rddNumPartitions = rdd.partitions.size
- val rddStorageLevel = rdd.getStorageLevel
- val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions,
rddStorageLevel)
- rddInfo
- }.toArray
+ def containsBlock(blockId: BlockId): Boolean = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).exists(_.contains(blockId))
+ case _ =>
+ _nonRddBlocks.contains(blockId)
+ }
}
- /** Returns storage information of all RDDs persisted in the given
SparkContext. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- sc: SparkContext): Array[RDDInfo] = {
- rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
+ /**
+ * Return the given block stored in this block manager in O(1) time.
+ * Note that this is much faster than `this.blocks.get`, which is
O(blocks) time.
+ */
+ def getBlock(blockId: BlockId): Option[BlockStatus] = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).map(_.get(blockId)).flatten
+ case _ =>
+ _nonRddBlocks.get(blockId)
+ }
}
- /** Returns storage information of all RDDs in the given list. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- rddInfos: Seq[RDDInfo],
- updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty):
Array[RDDInfo] = {
-
- // Mapping from a block ID -> its status
- val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
-
- // Record updated blocks, if any
- updatedBlocks
- .collect { case (id: RDDBlockId, status) => (id, status) }
- .foreach { case (id, status) => blockMap(id) = status }
-
- // Mapping from RDD ID -> an array of associated BlockStatuses
- val rddBlockMap = blockMap
- .groupBy { case (k, _) => k.rddId }
- .mapValues(_.values.toArray)
-
- // Mapping from RDD ID -> the associated RDDInfo (with potentially
outdated storage information)
- val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
-
- val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
- // Add up memory, disk and Tachyon sizes
- val persistedBlocks =
- blocks.filter { status => status.memSize + status.diskSize +
status.tachyonSize > 0 }
- val _storageLevel =
- if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel
else StorageLevel.NONE
- val memSize = persistedBlocks.map(_.memSize).reduceOption(_ +
_).getOrElse(0L)
- val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ +
_).getOrElse(0L)
- val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_
+ _).getOrElse(0L)
- rddInfoMap.get(rddId).map { rddInfo =>
- rddInfo.storageLevel = _storageLevel
- rddInfo.numCachedPartitions = persistedBlocks.length
- rddInfo.memSize = memSize
- rddInfo.diskSize = diskSize
- rddInfo.tachyonSize = tachyonSize
- rddInfo
- }
- }.toArray
+ /**
+ * Return the number of blocks stored in this block manager in O(RDDs)
time.
+ * Note that this is much faster than `this.blocks.size`, which is
O(blocks) time.
+ */
+ def numBlocks: Int = _nonRddBlocks.size + numRddBlocks
+
+ /**
+ * Return the number of RDD blocks stored in this block manager in
O(RDDs) time.
+ * Note that this is much faster than `this.rddBlocks.size`, which is
O(RDD blocks) time.
+ */
+ def numRddBlocks: Int = _rddBlocks.values.map(_.size).fold(0)(_ + _)
- scala.util.Sorting.quickSort(rddStorageInfos)
- rddStorageInfos
+ /**
+ * Return the number of blocks that belong to the given RDD in O(1) time.
+ * Note that this is much faster than `this.rddBlocksById(rddId).size`,
which is
+ * O(blocks in this RDD) time.
+ */
+ def numRddBlocksById(rddId: Int): Int =
_rddBlocks.get(rddId).map(_.size).getOrElse(0)
+
+ /** Return the memory remaining in this block manager. */
+ def memRemaining: Long = maxMem - memUsed
+
+ /** Return the memory used by this block manager. */
+ def memUsed: Long =
+ _nonRddStorageInfo._1 +
_rddBlocks.keys.toSeq.map(memUsedByRdd).fold(0L)(_ + _)
--- End diff --
We can replace all of these aggregations with `sum`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---