[1/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.

2018-01-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d9a973d65 -> b78130123


http://git-wip-us.apache.org/repos/asf/spark/blob/b7813012/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 11a6a34..7c6e06c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
 
 import java.net.URLEncoder
 import java.util.Date
+import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{HashMap, HashSet}
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskLocality
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
 import org.apache.spark.ui._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.Utils
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends 
WebUIPage("stage") {
   import ApiHelper._
-  import StagePage._
 
   private val TIMELINE_LEGEND = {
 
@@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   // if we find that it's okay.
   private val MAX_TIMELINE_TASKS = 
parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
 
-  private def getLocalitySummaryString(stageData: StageData, taskList: 
Seq[TaskData]): String = {
-val localities = taskList.map(_.taskLocality)
-val localityCounts = localities.groupBy(identity).mapValues(_.size)
+  private def getLocalitySummaryString(localitySummary: Map[String, Long]): 
String = {
 val names = Map(
   TaskLocality.PROCESS_LOCAL.toString() -> "Process local",
   TaskLocality.NODE_LOCAL.toString() -> "Node local",
   TaskLocality.RACK_LOCAL.toString() -> "Rack local",
   TaskLocality.ANY.toString() -> "Any")
-val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, 
count) =>
-  s"${names(locality)}: $count"
-}
+val localityNamesAndCounts = names.flatMap { case (key, name) =>
+  localitySummary.get(key).map { count =>
+s"$name: $count"
+  }
+}.toSeq
 localityNamesAndCounts.sorted.mkString("; ")
   }
 
@@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
 val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
 val stageData = parent.store
-  .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
true))
+  .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
false))
   .getOrElse {
 val content =
   
@@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 return UIUtils.headerSparkPage(stageHeader, content, parent)
   }
 
-val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq
-if (tasks.isEmpty) {
+val localitySummary = store.localitySummary(stageData.stageId, 
stageData.attemptId)
+
+val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
+  stageData.numFailedTasks + stageData.numKilledTasks
+if (totalTasks == 0) {
   val content =
 
   Summary Metrics No tasks have started yet
@@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   return UIUtils.headerSparkPage(stageHeader, content, parent)
 }
 
+val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId)
 val numCompleted = stageData.numCompleteTasks
-val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
-  stageData.numFailedTasks + stageData.numKilledTasks
-val totalTasksNumStr = if (totalTasks == tasks.size) {
+val totalTasksNumStr = if (totalTasks == storedTasks) {
   s"$totalTasks"
 } else {
-  s"$totalTasks, showing ${tasks.size}"
+  s"$totalTasks, showing ${storedTasks}"
 }
 
-val externalAccumulables = stageData.accumulatorUpdates
-val hasAccumulators = externalAccumulables.size > 0
-
 val summary =
   
 
@@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   
   
 Locality Level Summary: 
-{getLocalitySummaryString(stageData, tasks)}
+{getLocalitySummaryString(localitySummary)}
   
   {if (hasInput(stageData)) {
 
@@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab, st

[1/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.

2018-01-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 87c98de8b -> 1c70da3bf


http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 11a6a34..7c6e06c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
 
 import java.net.URLEncoder
 import java.util.Date
+import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{HashMap, HashSet}
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskLocality
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
 import org.apache.spark.ui._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.Utils
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends 
WebUIPage("stage") {
   import ApiHelper._
-  import StagePage._
 
   private val TIMELINE_LEGEND = {
 
@@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   // if we find that it's okay.
   private val MAX_TIMELINE_TASKS = 
parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
 
-  private def getLocalitySummaryString(stageData: StageData, taskList: 
Seq[TaskData]): String = {
-val localities = taskList.map(_.taskLocality)
-val localityCounts = localities.groupBy(identity).mapValues(_.size)
+  private def getLocalitySummaryString(localitySummary: Map[String, Long]): 
String = {
 val names = Map(
   TaskLocality.PROCESS_LOCAL.toString() -> "Process local",
   TaskLocality.NODE_LOCAL.toString() -> "Node local",
   TaskLocality.RACK_LOCAL.toString() -> "Rack local",
   TaskLocality.ANY.toString() -> "Any")
-val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, 
count) =>
-  s"${names(locality)}: $count"
-}
+val localityNamesAndCounts = names.flatMap { case (key, name) =>
+  localitySummary.get(key).map { count =>
+s"$name: $count"
+  }
+}.toSeq
 localityNamesAndCounts.sorted.mkString("; ")
   }
 
@@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
 val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
 val stageData = parent.store
-  .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
true))
+  .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = 
false))
   .getOrElse {
 val content =
   
@@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 return UIUtils.headerSparkPage(stageHeader, content, parent)
   }
 
-val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq
-if (tasks.isEmpty) {
+val localitySummary = store.localitySummary(stageData.stageId, 
stageData.attemptId)
+
+val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
+  stageData.numFailedTasks + stageData.numKilledTasks
+if (totalTasks == 0) {
   val content =
 
   Summary Metrics No tasks have started yet
@@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   return UIUtils.headerSparkPage(stageHeader, content, parent)
 }
 
+val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId)
 val numCompleted = stageData.numCompleteTasks
-val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
-  stageData.numFailedTasks + stageData.numKilledTasks
-val totalTasksNumStr = if (totalTasks == tasks.size) {
+val totalTasksNumStr = if (totalTasks == storedTasks) {
   s"$totalTasks"
 } else {
-  s"$totalTasks, showing ${tasks.size}"
+  s"$totalTasks, showing ${storedTasks}"
 }
 
-val externalAccumulables = stageData.accumulatorUpdates
-val hasAccumulators = externalAccumulables.size > 0
-
 val summary =
   
 
@@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
   
   
 Locality Level Summary: 
-{getLocalitySummaryString(stageData, tasks)}
+{getLocalitySummaryString(localitySummary)}
   
   {if (hasInput(stageData)) {
 
@@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab, store: