Repository: spark Updated Branches: refs/heads/stage-clean-up [created] 64c593ed0
SPARK-1337: Application web UI garbage collects newest stages instead old ones Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64c593ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64c593ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64c593ed Branch: refs/heads/stage-clean-up Commit: 64c593ed0026809d116c99a6965cc6783d42d709 Parents: 33e6361 Author: Patrick Wendell <pwend...@gmail.com> Authored: Thu Apr 3 17:51:11 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Thu Apr 3 20:58:26 2014 -0700 ---------------------------------------------------------------------- .../spark/ui/jobs/JobProgressListener.scala | 6 ++-- .../ui/jobs/JobProgressListenerSuite.scala | 33 ++++++++++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/64c593ed/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index d10aa12..34cb45e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { /** If stages is too large, remove and garbage collect old stages */ private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = retainedStages / 10 - stages.takeRight(toRemove).foreach( s => { + val toRemove = math.max(retainedStages / 10, 1) + stages.take(toRemove).foreach( s => { stageIdToTaskData.remove(s.stageId) stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) @@ -95,7 +95,7 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToPool.remove(s.stageId) if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)} }) - stages.trimEnd(toRemove) + stages.trimStart(toRemove) } } http://git-wip-us.apache.org/repos/asf/spark/blob/64c593ed/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index d8a3e85..67ceee5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -18,13 +18,42 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{LocalSparkContext, SparkContext, Success} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success} import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils -class JobProgressListenerSuite extends FunSuite with LocalSparkContext { +class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + test("test LRU eviction of stages") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) + + def createStageStartEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + SparkListenerStageSubmitted(stageInfo) + } + + def createStageEndEvent(stageId: Int) = { + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + SparkListenerStageCompleted(stageInfo) + } + + for (i <- 1 to 50) { + listener.onStageSubmitted(createStageStartEvent(i)) + listener.onStageCompleted(createStageEndEvent(i)) + } + + listener.completedStages.size should be (5) + listener.completedStages.filter(_.stageId == 50).size should be (1) + listener.completedStages.filter(_.stageId == 49).size should be (1) + listener.completedStages.filter(_.stageId == 48).size should be (1) + listener.completedStages.filter(_.stageId == 47).size should be (1) + listener.completedStages.filter(_.stageId == 46).size should be (1) + } + test("test executor id to summary") { val sc = new SparkContext("local", "test") val listener = new JobProgressListener(sc.conf)