This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 62fd133  [SPARK-27019][SQL][WEBUI] onJobStart happens after 
onExecutionEnd shouldn't overwrite kvstore
62fd133 is described below

commit 62fd133f744ab2d1aa3c409165914b5940e4d328
Author: Shahid <shahidk...@gmail.com>
AuthorDate: Wed Mar 6 14:02:30 2019 -0800

    [SPARK-27019][SQL][WEBUI] onJobStart happens after onExecutionEnd shouldn't 
overwrite kvstore
    
    ## What changes were proposed in this pull request?
    Currently, when the event reordering happens, especially onJobStart event 
come after onExecutionEnd event, SQL page in the UI displays weirdly.(for 
eg:test mentioned in JIRA and also this issue randomly occurs when the TPCDS 
query  fails due to broadcast timeout etc.)
    
    The reason is that, In the SQLAppstatusListener, we remove the 
liveExecutions entry once the execution ends. So, if a jobStart event come 
after that, then we create a new liveExecution entry corresponding to the 
execId. Eventually this will overwrite the kvstore and UI displays confusing 
entries.
    
    ## How was this patch tested?
    
    Added UT, Also manually tested with the eventLog, provided in the jira, of 
the failed query.
    
    Before fix:
    ![screenshot from 2019-03-03 
03-05-52](https://user-images.githubusercontent.com/23054875/53687929-53e2b800-3d61-11e9-9dca-620fa41e605c.png)
    
    After fix:
    ![screenshot from 2019-03-03 
02-40-18](https://user-images.githubusercontent.com/23054875/53687928-4f1e0400-3d61-11e9-86aa-584646ac68f9.png)
    
    Closes #23939 from shahidki31/SPARK-27019.
    
    Authored-by: Shahid <shahidk...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../sql/execution/ui/SQLAppStatusListener.scala    | 45 +++++++++++++++++-----
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 30 +++++++++++++++
 2 files changed, 66 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 45954f2..daf7f2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.ui
 
-import java.util.Date
+import java.util.{Date, NoSuchElementException}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Function
 
@@ -77,7 +77,29 @@ class SQLAppStatusListener(
 
     val executionId = executionIdString.toLong
     val jobId = event.jobId
-    val exec = getOrCreateExecution(executionId)
+    val exec = Option(liveExecutions.get(executionId))
+      .orElse {
+        try {
+          // Should not overwrite the kvstore with new entry, if it already 
has the SQLExecution
+          // data corresponding to the execId.
+          val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], 
executionId)
+          val executionData = new LiveExecutionData(executionId)
+          executionData.description = sqlStoreData.description
+          executionData.details = sqlStoreData.details
+          executionData.physicalPlanDescription = 
sqlStoreData.physicalPlanDescription
+          executionData.metrics = sqlStoreData.metrics
+          executionData.submissionTime = sqlStoreData.submissionTime
+          executionData.completionTime = sqlStoreData.completionTime
+          executionData.jobs = sqlStoreData.jobs
+          executionData.stages = sqlStoreData.stages
+          executionData.metricsValues = sqlStoreData.metricValues
+          executionData.endEvents = sqlStoreData.jobs.size + 1
+          liveExecutions.put(executionId, executionData)
+          Some(executionData)
+        } catch {
+          case _: NoSuchElementException => None
+        }
+      }.getOrElse(getOrCreateExecution(executionId))
 
     // Record the accumulator IDs for the stages of this job, so that the code 
that keeps
     // track of the metrics knows which accumulators to look at.
@@ -275,16 +297,20 @@ class SQLAppStatusListener(
       exec.endEvents += 1
       update(exec)
 
-      // Remove stale LiveStageMetrics objects for stages that are not active 
anymore.
-      val activeStages = liveExecutions.values().asScala.flatMap { other =>
-        if (other != exec) other.stages else Nil
-      }.toSet
-      stageMetrics.keySet().asScala
-        .filter(!activeStages.contains(_))
-        .foreach(stageMetrics.remove)
+      removeStaleMetricsData(exec)
     }
   }
 
+  private def removeStaleMetricsData(exec: LiveExecutionData): Unit = {
+    // Remove stale LiveStageMetrics objects for stages that are not active 
anymore.
+    val activeStages = liveExecutions.values().asScala.flatMap { other =>
+      if (other != exec) other.stages else Nil
+    }.toSet
+    stageMetrics.keySet().asScala
+      .filter(!activeStages.contains(_))
+      .foreach(stageMetrics.remove)
+  }
+
   private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): 
Unit = {
     val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
     Option(liveExecutions.get(executionId)).foreach { exec =>
@@ -311,6 +337,7 @@ class SQLAppStatusListener(
     val now = System.nanoTime()
     if (exec.endEvents >= exec.jobs.size + 1) {
       exec.write(kvstore, now)
+      removeStaleMetricsData(exec)
       liveExecutions.remove(exec.executionId)
     } else if (force) {
       exec.write(kvstore, now)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index c8d862c..f19bf5f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -384,6 +384,36 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with 
SharedSQLContext with
     assertJobs(statusStore.execution(executionId), failed = Seq(0))
   }
 
+  test("onJobStart happens after onExecutionEnd shouldn't overwrite kvstore") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Seq(createStageInfo(0, 0)),
+      createProperties(executionId)))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 
0)))
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      JobFailed(new RuntimeException("Oops"))))
+
+    assert(listener.noLiveData())
+    assert(statusStore.execution(executionId).get.completionTime.nonEmpty)
+  }
+
   test("handle one execution with multiple jobs") {
     val statusStore = createStatusStore()
     val listener = statusStore.listener.get


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to