Repository: spark
Updated Branches:
  refs/heads/branch-1.0 27a2afed6 -> cf1d46e46


[SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI

**Problem.** The existing code in `ExecutorPage.scala` requires a linear scan 
through all the blocks to filter out the uncached ones. Every refresh could be 
expensive if there are many blocks and many executors.

**Solution.** The proper semantics should be the following: 
`StorageStatusListener` should contain only block statuses that are cached. 
This means as soon as a block is unpersisted by any mean, its status should be 
removed. This is reflected in the changes made in `StorageStatusListener.scala`.

Further, the `StorageTab` must stop relying on the `StorageStatusListener` 
changing a dropped block's status to `StorageLevel.NONE` (which no longer 
happens). This is reflected in the changes made in `StorageTab.scala` and 
`StorageUtils.scala`.

----------

If you have been following this chain of PRs like pwendell, you will quickly 
notice that this reverts the changes in #1249, which reverts the changes in 
#1080. In other words, we are adding back the changes from #1080, and fixing 
SPARK-2307 on top of those changes. Please ask questions if you are confused.

Author: Andrew Or <andrewo...@gmail.com>

Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following 
commits:

45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
storage-ui-fix-reprise
a82ea25 [Andrew Or] Add tests for StorageStatusListener
8773b01 [Andrew Or] Update comment / minor changes
3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI
(cherry picked from commit 3894a49be9b532cc026d908a0f49bca850504498)

Signed-off-by: Patrick Wendell <pwend...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf1d46e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf1d46e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf1d46e4

Branch: refs/heads/branch-1.0
Commit: cf1d46e46518c818d20f07cdaabbd8069d877ca8
Parents: 27a2afe
Author: Andrew Or <andrewo...@gmail.com>
Authored: Thu Jul 3 22:48:23 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Jul 3 22:48:33 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/StorageStatusListener.scala   |  17 ++-
 .../org/apache/spark/storage/StorageUtils.scala |  15 +-
 .../apache/spark/ui/exec/ExecutorsPage.scala    |   4 +-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |   4 +-
 .../apache/spark/ui/storage/StorageTab.scala    |  15 +-
 .../storage/StorageStatusListenerSuite.scala    | 152 +++++++++++++++++++
 6 files changed, 184 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index a6e6627..41c960c 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
  */
 @DeveloperApi
 class StorageStatusListener extends SparkListener {
-  private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+  // This maintains only blocks that are cached (i.e. storage level is not 
StorageLevel.NONE)
+  private[storage] val executorIdToStorageStatus = mutable.Map[String, 
StorageStatus]()
 
   def storageStatusList = executorIdToStorageStatus.values.toSeq
 
   /** Update storage status list to reflect updated block statuses */
-  def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, 
BlockStatus)]) {
-    val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == 
execId)
+  private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, 
BlockStatus)]) {
+    val filteredStatus = executorIdToStorageStatus.get(execId)
     filteredStatus.foreach { storageStatus =>
       updatedBlocks.foreach { case (blockId, updatedStatus) =>
-        storageStatus.blocks(blockId) = updatedStatus
+        if (updatedStatus.storageLevel == StorageLevel.NONE) {
+          storageStatus.blocks.remove(blockId)
+        } else {
+          storageStatus.blocks(blockId) = updatedStatus
+        }
       }
     }
   }
 
   /** Update storage status list to reflect the removal of an RDD from the 
cache */
-  def updateStorageStatus(unpersistedRDDId: Int) {
+  private def updateStorageStatus(unpersistedRDDId: Int) {
     storageStatusList.foreach { storageStatus =>
       val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId 
== unpersistedRDDId)
       unpersistedBlocksIds.foreach { blockId =>
-        storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 
0L)
+        storageStatus.blocks.remove(blockId)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index f3bde1d..177281f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -75,17 +75,26 @@ private[spark] object StorageUtils {
   /** Returns storage information of all RDDs in the given list. */
   def rddInfoFromStorageStatus(
       storageStatuses: Seq[StorageStatus],
-      rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
+      rddInfos: Seq[RDDInfo],
+      updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] 
= {
+
+    // Mapping from a block ID -> its status
+    val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
+
+    // Record updated blocks, if any
+    updatedBlocks
+      .collect { case (id: RDDBlockId, status) => (id, status) }
+      .foreach { case (id, status) => blockMap(id) = status }
 
     // Mapping from RDD ID -> an array of associated BlockStatuses
-    val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
+    val rddBlockMap = blockMap
       .groupBy { case (k, _) => k.rddId }
       .mapValues(_.values.toArray)
 
     // Mapping from RDD ID -> the associated RDDInfo (with potentially 
outdated storage information)
     val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
 
-    val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
+    val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
       // Add up memory, disk and Tachyon sizes
       val persistedBlocks =
         blocks.filter { status => status.memSize + status.diskSize + 
status.tachyonSize > 0 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 6cfc46c..9b490b7 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -108,9 +108,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) 
extends WebUIPage("") {
     val status = listener.storageStatusList(statusId)
     val execId = status.blockManagerId.executorId
     val hostPort = status.blockManagerId.hostPort
-    val rddBlocks = status.blocks.count { case (_, blockStatus) =>
-      blockStatus.storageLevel != StorageLevel.NONE
-    }
+    val rddBlocks = status.blocks.size
     val memUsed = status.memUsed
     val maxMem = status.maxMem
     val diskUsed = status.diskUsed

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b8..38eda41 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends 
WebUITab(parent, "execut
  * A SparkListener that prepares information to be displayed on the 
ExecutorsTab
  */
 @DeveloperApi
-class ExecutorsListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
+class ExecutorsListener(storageStatusListener: StorageStatusListener) extends 
SparkListener {
   val executorToTasksActive = HashMap[String, Int]()
   val executorToTasksComplete = HashMap[String, Int]()
   val executorToTasksFailed = HashMap[String, Int]()

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala 
b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index c4bb7aa..0cc0cf3 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.ui._
 import org.apache.spark.scheduler._
-import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
+import org.apache.spark.storage._
 
 /** Web UI showing storage status of all RDD's in the given SparkContext. */
 private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, 
"storage") {
@@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends 
WebUITab(parent, "storage"
  * A SparkListener that prepares information to be displayed on the 
BlockManagerUI.
  */
 @DeveloperApi
-class StorageListener(storageStatusListener: StorageStatusListener)
-  extends SparkListener {
-
+class StorageListener(storageStatusListener: StorageStatusListener) extends 
SparkListener {
   private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
 
   def storageStatusList = storageStatusListener.storageStatusList
@@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: 
StorageStatusListener)
   def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
 
   /** Update each RDD's info to reflect any updates to the RDD's storage 
status */
-  private def updateRDDInfo() {
+  private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = 
Seq.empty) {
     val rddInfos = _rddInfoMap.values.toSeq
-    val updatedRddInfos = 
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
+    val updatedRddInfos =
+      StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, 
updatedBlocks)
     updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
   }
 
@@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: 
StorageStatusListener)
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
     val metrics = taskEnd.taskMetrics
     if (metrics != null && metrics.updatedBlocks.isDefined) {
-      updateRDDInfo()
+      updateRDDInfo(metrics.updatedBlocks.get)
     }
   }
 
@@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: 
StorageStatusListener)
   }
 
   override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = 
synchronized {
-    updateRDDInfo()
+    _rddInfoMap.remove(unpersistRDD.rddId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cf1d46e4/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
new file mode 100644
index 0000000..2179c6d
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.Success
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+
+/**
+ * Test the behavior of StorageStatusListener in response to all relevant 
events.
+ */
+class StorageStatusListenerSuite extends FunSuite {
+  private val bm1 = BlockManagerId("big", "dog", 1, 1)
+  private val bm2 = BlockManagerId("fat", "duck", 2, 2)
+  private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", 
TaskLocality.ANY, false)
+  private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", 
TaskLocality.ANY, false)
+
+  test("block manager added/removed") {
+    val listener = new StorageStatusListener
+
+    // Block manager add
+    assert(listener.executorIdToStorageStatus.size === 0)
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    assert(listener.executorIdToStorageStatus.size === 1)
+    assert(listener.executorIdToStorageStatus.get("big").isDefined)
+    assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
+    assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    assert(listener.executorIdToStorageStatus.size === 2)
+    assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+    assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
+    assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+
+    // Block manager remove
+    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+    assert(listener.executorIdToStorageStatus.size === 1)
+    assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+    assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+    assert(listener.executorIdToStorageStatus.size === 0)
+    assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+    assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
+  }
+
+  test("task end without updated blocks") {
+    val listener = new StorageStatusListener
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    val taskMetrics = new TaskMetrics
+
+    // Task end with no updated blocks
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics))
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics))
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+  }
+
+  test("task end with updated blocks") {
+    val listener = new StorageStatusListener
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    val taskMetrics1 = new TaskMetrics
+    val taskMetrics2 = new TaskMetrics
+    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
100L, 0L))
+    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
200L, 0L))
+    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
300L, 0L))
+    taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+    taskMetrics2.updatedBlocks = Some(Seq(block3))
+
+    // Task end with new blocks
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+    assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
1)))
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
2)))
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics2))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+    assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
1)))
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
2)))
+    
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 
0)))
+
+    // Task end with dropped blocks
+    val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 
0L, 0L))
+    val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 
0L, 0L))
+    val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 
0L, 0L))
+    taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
+    taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+    assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+    
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
1)))
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
2)))
+    
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 
0)))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo2, taskMetrics2))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+    assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+    
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
1)))
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
2)))
+    assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+  }
+
+  test("unpersist RDD") {
+    val listener = new StorageStatusListener
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    val taskMetrics1 = new TaskMetrics
+    val taskMetrics2 = new TaskMetrics
+    val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
100L, 0L))
+    val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
200L, 0L))
+    val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 
300L, 0L))
+    taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+    taskMetrics2.updatedBlocks = Some(Seq(block3))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics1))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, 
taskInfo1, taskMetrics2))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+
+    // Unpersist RDD
+    listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+    listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
+    assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
1)))
+    
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 
2)))
+    listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
+    assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+  }
+}

Reply via email to