spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

2018-02-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 373ac642f -> 23ba4416e


[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

## What changes were proposed in this pull request?

The issue here is `AppStatusStore.lastStageAttempt` will return the next 
available stage in the store when a stage doesn't exist.

This PR adds `last(stageId)` to ensure it returns a correct `StageData`

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu 

Closes #20654 from zsxwing/SPARK-23481.

(cherry picked from commit 744d5af652ee8cece361cbca31e5201134e0fb42)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ba4416
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ba4416
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ba4416

Branch: refs/heads/branch-2.3
Commit: 23ba4416e1bbbaa818876d7a837f7a5e260aa048
Parents: 373ac64
Author: Shixiong Zhu 
Authored: Wed Feb 21 15:37:28 2018 -0800
Committer: Shixiong Zhu 
Committed: Wed Feb 21 15:37:36 2018 -0800

--
 .../apache/spark/status/AppStatusStore.scala|  6 +++-
 .../spark/status/AppStatusListenerSuite.scala   | 33 
 2 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index efc2853..688f25a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
   }
 
   def lastStageAttempt(stageId: Int): v1.StageData = {
-val it = 
store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+val it = store.view(classOf[StageDataWrapper])
+  .index("stageId")
+  .reverse()
+  .first(stageId)
+  .last(stageId)
   .closeableIterator()
 try {
   if (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 01d76a2..f3fa4c9 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1057,6 +1057,39 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("lastStageAttempt should fail when the stage doesn't exist") {
+val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
+val listener = new AppStatusListener(store, testConf, true)
+val appStore = new AppStatusStore(store)
+
+val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+time += 1
+stage1.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
+stage1.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+// Make stage 3 complete before stage 2 so that stage 3 will be evicted
+time += 1
+stage3.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new 
Properties()))
+stage3.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage3))
+
+time += 1
+stage2.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
+stage2.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+
+assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
+assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === 
Some(2))
+assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
+  }
+
   test("driver logs") {
 val listener = new AppStatusListener(store, conf, true)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

2018-02-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 3fd0ccb13 -> 744d5af65


[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

## What changes were proposed in this pull request?

The issue here is `AppStatusStore.lastStageAttempt` will return the next 
available stage in the store when a stage doesn't exist.

This PR adds `last(stageId)` to ensure it returns a correct `StageData`

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu 

Closes #20654 from zsxwing/SPARK-23481.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/744d5af6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/744d5af6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/744d5af6

Branch: refs/heads/master
Commit: 744d5af652ee8cece361cbca31e5201134e0fb42
Parents: 3fd0ccb
Author: Shixiong Zhu 
Authored: Wed Feb 21 15:37:28 2018 -0800
Committer: Shixiong Zhu 
Committed: Wed Feb 21 15:37:28 2018 -0800

--
 .../apache/spark/status/AppStatusStore.scala|  6 +++-
 .../spark/status/AppStatusListenerSuite.scala   | 33 
 2 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index efc2853..688f25a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
   }
 
   def lastStageAttempt(stageId: Int): v1.StageData = {
-val it = 
store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+val it = store.view(classOf[StageDataWrapper])
+  .index("stageId")
+  .reverse()
+  .first(stageId)
+  .last(stageId)
   .closeableIterator()
 try {
   if (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 7495027..673d191 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1121,6 +1121,39 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 }
   }
 
+  test("lastStageAttempt should fail when the stage doesn't exist") {
+val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
+val listener = new AppStatusListener(store, testConf, true)
+val appStore = new AppStatusStore(store)
+
+val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+time += 1
+stage1.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
+stage1.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+// Make stage 3 complete before stage 2 so that stage 3 will be evicted
+time += 1
+stage3.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new 
Properties()))
+stage3.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage3))
+
+time += 1
+stage2.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
+stage2.completionTime = Some(time)
+listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+
+assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
+assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === 
Some(2))
+assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
+  }
+
   test("driver logs") {
 val listener = new AppStatusListener(store, conf, true)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org