spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist
Repository: spark Updated Branches: refs/heads/branch-2.3 373ac642f -> 23ba4416e [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist ## What changes were proposed in this pull request? The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist. This PR adds `last(stageId)` to ensure it returns a correct `StageData` ## How was this patch tested? The new unit test. Author: Shixiong ZhuCloses #20654 from zsxwing/SPARK-23481. (cherry picked from commit 744d5af652ee8cece361cbca31e5201134e0fb42) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ba4416 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ba4416 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ba4416 Branch: refs/heads/branch-2.3 Commit: 23ba4416e1bbbaa818876d7a837f7a5e260aa048 Parents: 373ac64 Author: Shixiong Zhu Authored: Wed Feb 21 15:37:28 2018 -0800 Committer: Shixiong Zhu Committed: Wed Feb 21 15:37:36 2018 -0800 -- .../apache/spark/status/AppStatusStore.scala| 6 +++- .../spark/status/AppStatusListenerSuite.scala | 33 2 files changed, 38 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index efc2853..688f25a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -95,7 +95,11 @@ private[spark] class AppStatusStore( } def lastStageAttempt(stageId: Int): v1.StageData = { -val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId) +val it = store.view(classOf[StageDataWrapper]) + .index("stageId") + .reverse() + .first(stageId) + .last(stageId) .closeableIterator() try { if (it.hasNext()) { http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 01d76a2..f3fa4c9 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1057,6 +1057,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("lastStageAttempt should fail when the stage doesn't exist") { +val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1) +val listener = new AppStatusListener(store, testConf, true) +val appStore = new AppStatusStore(store) + +val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") +val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") +val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3") + +time += 1 +stage1.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) +stage1.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + +// Make stage 3 complete before stage 2 so that stage 3 will be evicted +time += 1 +stage3.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties())) +stage3.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage3)) + +time += 1 +stage2.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) +stage2.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage2)) + +assert(appStore.asOption(appStore.lastStageAttempt(1)) === None) +assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2)) +assert(appStore.asOption(appStore.lastStageAttempt(3)) === None) + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist
Repository: spark Updated Branches: refs/heads/master 3fd0ccb13 -> 744d5af65 [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist ## What changes were proposed in this pull request? The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist. This PR adds `last(stageId)` to ensure it returns a correct `StageData` ## How was this patch tested? The new unit test. Author: Shixiong ZhuCloses #20654 from zsxwing/SPARK-23481. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/744d5af6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/744d5af6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/744d5af6 Branch: refs/heads/master Commit: 744d5af652ee8cece361cbca31e5201134e0fb42 Parents: 3fd0ccb Author: Shixiong Zhu Authored: Wed Feb 21 15:37:28 2018 -0800 Committer: Shixiong Zhu Committed: Wed Feb 21 15:37:28 2018 -0800 -- .../apache/spark/status/AppStatusStore.scala| 6 +++- .../spark/status/AppStatusListenerSuite.scala | 33 2 files changed, 38 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index efc2853..688f25a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -95,7 +95,11 @@ private[spark] class AppStatusStore( } def lastStageAttempt(stageId: Int): v1.StageData = { -val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId) +val it = store.view(classOf[StageDataWrapper]) + .index("stageId") + .reverse() + .first(stageId) + .last(stageId) .closeableIterator() try { if (it.hasNext()) { http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 7495027..673d191 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1121,6 +1121,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("lastStageAttempt should fail when the stage doesn't exist") { +val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1) +val listener = new AppStatusListener(store, testConf, true) +val appStore = new AppStatusStore(store) + +val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") +val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2") +val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3") + +time += 1 +stage1.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties())) +stage1.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage1)) + +// Make stage 3 complete before stage 2 so that stage 3 will be evicted +time += 1 +stage3.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties())) +stage3.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage3)) + +time += 1 +stage2.submissionTime = Some(time) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties())) +stage2.completionTime = Some(time) +listener.onStageCompleted(SparkListenerStageCompleted(stage2)) + +assert(appStore.asOption(appStore.lastStageAttempt(1)) === None) +assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2)) +assert(appStore.asOption(appStore.lastStageAttempt(3)) === None) + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org