[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r158326122
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  // Live tracked data is needed by the SQL status store to calculate 
metrics for in-flight
+  // executions; that means arbitrary threads may be querying these maps, 
so they need to be
+  // thread-safe.
+  private val liveExecutions = new ConcurrentHashMap[Long, 
LiveExecutionData]()
+  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds.toSet
--- End diff --

oh good catch.  I can submit a fix for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-21 Thread carsonwang
Github user carsonwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r158226032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  // Live tracked data is needed by the SQL status store to calculate 
metrics for in-flight
+  // executions; that means arbitrary threads may be querying these maps, 
so they need to be
+  // thread-safe.
+  private val liveExecutions = new ConcurrentHashMap[Long, 
LiveExecutionData]()
+  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds.toSet
--- End diff --

@vanzin , shall we add the stageIds to the existing stageIds? Otherwise we 
will lose the stageIds in previous jobs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157331270
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.status.config._
 import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
   import testImplicits._
-  import org.apache.spark.AccumulatorSuite.makeInfo
+
+  override protected def sparkConf = 
super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
--- End diff --

ah you are right, it's only shared in hive tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157279789
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
   (id, accumulatorValue)
 }.toMap
 
-listener.onOtherEvent(SparkListenerSQLExecutionStart(
+bus.postToAll(SparkListenerSQLExecutionStart(
   executionId,
   "test",
   "test",
   df.queryExecution.toString,
   SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
   System.currentTimeMillis()))
 
-val executionUIData = listener.executionIdToData(0)
-
-listener.onJobStart(SparkListenerJobStart(
+bus.postToAll(SparkListenerJobStart(
   jobId = 0,
   time = System.currentTimeMillis(),
   stageInfos = Seq(
 createStageInfo(0, 0),
 createStageInfo(1, 0)
   ),
   createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
 
-assert(listener.getExecutionMetrics(0).isEmpty)
+assert(store.executionMetrics(0).isEmpty)
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Driver accumulator updates don't belong to this execution should be 
filtered and no
 // exception will be thrown.
-listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
+
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0,
-createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
 
 // Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Ignore the task end for the first attempt
-listener.onTaskEnd(SparkListenerTaskEnd(
+bus.postToAll(SparkListenerTaskEnd(
   stageId = 0,
   stageAttemptId = 0,
   taskType = "",
   reason = null,
-  createTaskInfo(0, 0),
-  createTaskMetrics(accumulatorUpdates.mapValues(_ * 100
+  createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+  null))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157279430
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
   (id, accumulatorValue)
 }.toMap
 
-listener.onOtherEvent(SparkListenerSQLExecutionStart(
+bus.postToAll(SparkListenerSQLExecutionStart(
   executionId,
   "test",
   "test",
   df.queryExecution.toString,
   SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
   System.currentTimeMillis()))
 
-val executionUIData = listener.executionIdToData(0)
-
-listener.onJobStart(SparkListenerJobStart(
+bus.postToAll(SparkListenerJobStart(
   jobId = 0,
   time = System.currentTimeMillis(),
   stageInfos = Seq(
 createStageInfo(0, 0),
 createStageInfo(1, 0)
   ),
   createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
 
-assert(listener.getExecutionMetrics(0).isEmpty)
+assert(store.executionMetrics(0).isEmpty)
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Driver accumulator updates don't belong to this execution should be 
filtered and no
 // exception will be thrown.
-listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
+
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0,
-createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
 
 // Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Ignore the task end for the first attempt
-listener.onTaskEnd(SparkListenerTaskEnd(
+bus.postToAll(SparkListenerTaskEnd(
   stageId = 0,
   stageAttemptId = 0,
   taskType = "",
   reason = null,
-  createTaskInfo(0, 0),
-  createTaskMetrics(accumulatorUpdates.mapValues(_ * 100
+  createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+  null))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157278467
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.status.config._
 import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
   import testImplicits._
-  import org.apache.spark.AccumulatorSuite.makeInfo
+
+  override protected def sparkConf = 
super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
--- End diff --

Is that true? The suite extends `SharedSQLContext` (which extends 
`SharedSparkSession`) and `SQLTestUtils`, all of which are traits, not objects. 
(Unlike `TestHive` which does force sessions to be used across suites for hive 
tests.)

There are also other suites that modify the conf (such as 
`HDFSMetadataLogSuite`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157130281
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.status.config._
 import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
   import testImplicits._
-  import org.apache.spark.AccumulatorSuite.makeInfo
+
+  override protected def sparkConf = 
super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
--- End diff --

the spark context is shared for all test suites, we should only set this 
conf to 0 in this suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157129866
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
   (id, accumulatorValue)
 }.toMap
 
-listener.onOtherEvent(SparkListenerSQLExecutionStart(
+bus.postToAll(SparkListenerSQLExecutionStart(
   executionId,
   "test",
   "test",
   df.queryExecution.toString,
   SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
   System.currentTimeMillis()))
 
-val executionUIData = listener.executionIdToData(0)
-
-listener.onJobStart(SparkListenerJobStart(
+bus.postToAll(SparkListenerJobStart(
   jobId = 0,
   time = System.currentTimeMillis(),
   stageInfos = Seq(
 createStageInfo(0, 0),
 createStageInfo(1, 0)
   ),
   createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
 
-assert(listener.getExecutionMetrics(0).isEmpty)
+assert(store.executionMetrics(0).isEmpty)
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Driver accumulator updates don't belong to this execution should be 
filtered and no
 // exception will be thrown.
-listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
+
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0,
-createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
 
 // Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Ignore the task end for the first attempt
-listener.onTaskEnd(SparkListenerTaskEnd(
+bus.postToAll(SparkListenerTaskEnd(
   stageId = 0,
   stageAttemptId = 0,
   taskType = "",
   reason = null,
-  createTaskInfo(0, 0),
-  createTaskMetrics(accumulatorUpdates.mapValues(_ * 100
+  createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+  null))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157129912
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
   (id, accumulatorValue)
 }.toMap
 
-listener.onOtherEvent(SparkListenerSQLExecutionStart(
+bus.postToAll(SparkListenerSQLExecutionStart(
   executionId,
   "test",
   "test",
   df.queryExecution.toString,
   SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
   System.currentTimeMillis()))
 
-val executionUIData = listener.executionIdToData(0)
-
-listener.onJobStart(SparkListenerJobStart(
+bus.postToAll(SparkListenerJobStart(
   jobId = 0,
   time = System.currentTimeMillis(),
   stageInfos = Seq(
 createStageInfo(0, 0),
 createStageInfo(1, 0)
   ),
   createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
 
-assert(listener.getExecutionMetrics(0).isEmpty)
+assert(store.executionMetrics(0).isEmpty)
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Driver accumulator updates don't belong to this execution should be 
filtered and no
 // exception will be thrown.
-listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
+
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0,
-createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
 
 // Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Ignore the task end for the first attempt
-listener.onTaskEnd(SparkListenerTaskEnd(
+bus.postToAll(SparkListenerTaskEnd(
   stageId = 0,
   stageAttemptId = 0,
   taskType = "",
   reason = null,
-  createTaskInfo(0, 0),
-  createTaskMetrics(accumulatorUpdates.mapValues(_ * 100
+  createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+  null))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157129757
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
   (id, accumulatorValue)
 }.toMap
 
-listener.onOtherEvent(SparkListenerSQLExecutionStart(
+bus.postToAll(SparkListenerSQLExecutionStart(
   executionId,
   "test",
   "test",
   df.queryExecution.toString,
   SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
   System.currentTimeMillis()))
 
-val executionUIData = listener.executionIdToData(0)
-
-listener.onJobStart(SparkListenerJobStart(
+bus.postToAll(SparkListenerJobStart(
   jobId = 0,
   time = System.currentTimeMillis(),
   stageInfos = Seq(
 createStageInfo(0, 0),
 createStageInfo(1, 0)
   ),
   createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
 
-assert(listener.getExecutionMetrics(0).isEmpty)
+assert(store.executionMetrics(0).isEmpty)
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Driver accumulator updates don't belong to this execution should be 
filtered and no
 // exception will be thrown.
-listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
+
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 0,
-createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
+  (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
 
 // Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
 
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
   // (task id, stage id, stage attempt, accum updates)
-  (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
-  (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+  (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+  (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
 )))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
 
 // Ignore the task end for the first attempt
-listener.onTaskEnd(SparkListenerTaskEnd(
+bus.postToAll(SparkListenerTaskEnd(
   stageId = 0,
   stageAttemptId = 0,
   taskType = "",
   reason = null,
-  createTaskInfo(0, 0),
-  createTaskMetrics(accumulatorUpdates.mapValues(_ * 100
+  createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+  null))
 
-checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
+checkAnswer(store.executionMetrics(0), 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157016366
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
 // For tests, do not enable the UI
 None
   }
-// Bind the UI before starting the task scheduler to communicate
-// the bound port to the cluster manager properly
-_ui.foreach(_.bind())
+_ui.foreach { ui =>
+  // Load any plugins that might want to modify the UI.
+  AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
--- End diff --

Let's continue the discussion on the other PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157015987
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
 // For tests, do not enable the UI
 None
   }
-// Bind the UI before starting the task scheduler to communicate
-// the bound port to the cluster manager properly
-_ui.foreach(_.bind())
+_ui.foreach { ui =>
+  // Load any plugins that might want to modify the UI.
+  AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
--- End diff --

@vanzin the live UI doesn't need a 2-step process to set up the UI, while 
history server needs. That's why I think they should not share one plugin 
interface.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r157013164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.lang.{Long => JLong}
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.AppStatusPlugin
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query 
SQL-specific state. There's
+ * no state kept in this class, so it's ok to have multiple instances of 
it in an application.
+ */
+private[sql] class SQLAppStatusStore(
+store: KVStore,
+listener: Option[SQLAppStatusListener] = None) {
+
+  def executionsList(): Seq[SQLExecutionUIData] = {
+store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+  }
+
+  def execution(executionId: Long): Option[SQLExecutionUIData] = {
+try {
+  Some(store.read(classOf[SQLExecutionUIData], executionId))
+} catch {
+  case _: NoSuchElementException => None
+}
+  }
+
+  def executionsCount(): Long = {
+store.count(classOf[SQLExecutionUIData])
+  }
+
+  def executionMetrics(executionId: Long): Map[Long, String] = {
+def metricsFromStore(): Option[Map[Long, String]] = {
+  val exec = store.read(classOf[SQLExecutionUIData], executionId)
+  Option(exec.metricValues)
+}
+
+metricsFromStore()
+  .orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
+  // Try a second time in case the execution finished while this 
method is trying to
+  // get the metrics.
+  .orElse(metricsFromStore())
+  .getOrElse(Map())
+  }
+
+  def planGraph(executionId: Long): SparkPlanGraph = {
+store.read(classOf[SparkPlanGraphWrapper], 
executionId).toSparkPlanGraph()
+  }
+
+}
+
+/**
+ * An AppStatusPlugin for handling the SQL UI and listeners.
+ */
+private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
+
+  override def setupListeners(
+  conf: SparkConf,
+  store: KVStore,
+  addListenerFn: SparkListener => Unit,
+  live: Boolean): Unit = {
+// For live applications, the listener is installed in [[setupUI]]. 
This also avoids adding
+// the listener when the UI is disabled. Force installation during 
testing, though.
+if (!live || Utils.isTesting) {
+  val listener = new SQLAppStatusListener(conf, store, live, None)
+  addListenerFn(listener)
+}
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
--- End diff --

The calls are made in specific cases (setupListeners when setting up a 
listener bus, setupUI when setting up the UI, always). But this implementation 
has to be a little weird because we don't want the SQL UI is SQL hasn't been 
initialized, if we're to maintain the old behavior.

I don't think the listener is installed twice here - here it's only 
installed for non-live applications (= SHS) and below it's only installed if 
there's a SparkContext (= live application).

If we're ok to modify the existing behavior and always have the SQL tab, 
this can be simplified a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-12-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r156987695
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.lang.{Long => JLong}
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.AppStatusPlugin
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query 
SQL-specific state. There's
+ * no state kept in this class, so it's ok to have multiple instances of 
it in an application.
+ */
+private[sql] class SQLAppStatusStore(
+store: KVStore,
+listener: Option[SQLAppStatusListener] = None) {
+
+  def executionsList(): Seq[SQLExecutionUIData] = {
+store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+  }
+
+  def execution(executionId: Long): Option[SQLExecutionUIData] = {
+try {
+  Some(store.read(classOf[SQLExecutionUIData], executionId))
+} catch {
+  case _: NoSuchElementException => None
+}
+  }
+
+  def executionsCount(): Long = {
+store.count(classOf[SQLExecutionUIData])
+  }
+
+  def executionMetrics(executionId: Long): Map[Long, String] = {
+def metricsFromStore(): Option[Map[Long, String]] = {
+  val exec = store.read(classOf[SQLExecutionUIData], executionId)
+  Option(exec.metricValues)
+}
+
+metricsFromStore()
+  .orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
+  // Try a second time in case the execution finished while this 
method is trying to
+  // get the metrics.
+  .orElse(metricsFromStore())
+  .getOrElse(Map())
+  }
+
+  def planGraph(executionId: Long): SparkPlanGraph = {
+store.read(classOf[SparkPlanGraphWrapper], 
executionId).toSparkPlanGraph()
+  }
+
+}
+
+/**
+ * An AppStatusPlugin for handling the SQL UI and listeners.
+ */
+private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
+
+  override def setupListeners(
+  conf: SparkConf,
+  store: KVStore,
+  addListenerFn: SparkListener => Unit,
+  live: Boolean): Unit = {
+// For live applications, the listener is installed in [[setupUI]]. 
This also avoids adding
+// the listener when the UI is disabled. Force installation during 
testing, though.
+if (!live || Utils.isTesting) {
+  val listener = new SQLAppStatusListener(conf, store, live, None)
+  addListenerFn(listener)
+}
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
--- End diff --

Do we have a clear rule about when `setupListeners` is called and when 
`setupUI` is called?

Here we register `SQLAppStatusListener` in both `setupListeners` and 
`setupUI`, will we register it twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19681


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r150094936
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  // Live tracked data is needed by the SQL status store to calculate 
metrics for in-flight
+  // executions; that means arbitrary threads may be querying these maps, 
so they need to be
+  // thread-safe.
+  private val liveExecutions = new ConcurrentHashMap[Long, 
LiveExecutionData]()
+  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds.toSet
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values().asScala.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r150091458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.lang.{Long => JLong}
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.AppStatusPlugin
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query 
SQL-specific state. There's
+ * no state kept in this class, so it's ok to have multiple instances of 
it in an application.
+ */
+private[sql] class SQLAppStatusStore(
+store: KVStore,
+listener: Option[SQLAppStatusListener] = None) {
+
+  def executionsList(): Seq[SQLExecutionUIData] = {
+store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+  }
+
+  def execution(executionId: Long): Option[SQLExecutionUIData] = {
+try {
+  Some(store.read(classOf[SQLExecutionUIData], executionId))
+} catch {
+  case _: NoSuchElementException => None
+}
+  }
+
+  def executionsCount(): Long = {
+store.count(classOf[SQLExecutionUIData])
+  }
+
+  def executionMetrics(executionId: Long): Map[Long, String] = {
+val exec = store.read(classOf[SQLExecutionUIData], executionId)
+Option(exec.metricValues)
+  .orElse(listener.map(_.executionMetrics(executionId)))
+  .getOrElse(Map())
--- End diff --

is there a race here when the execution ends?

- T1 (UI thread, calling this method): execution hasn't ended, so 
`exec.metricValues` is null
- T2 (listener): execution ends, drops execution from `liveExecutions`
- T1: `_.executionMetrics(executionId)` throws an exception because its 
dropped from the `liveExecutions`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149801087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds.toSet
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149791151
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 withTempPath { file =>
   // person creates a temporary view. get the DF before listing 
previous execution IDs
   val data = person.select('name)
-  sparkContext.listenerBus.waitUntilEmpty(1)
--- End diff --

`person.select()` doesn't generate any events.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149767509
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds.toSet
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149780801
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 withTempPath { file =>
   // person creates a temporary view. get the DF before listing 
previous execution IDs
   val data = person.select('name)
-  sparkContext.listenerBus.waitUntilEmpty(1)
--- End diff --

don't you still need this `waitUntilEmpty`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149778794
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -89,398 +83,3 @@ private class LongLongTupleConverter extends 
Converter[(Object, Object), (Long,
 typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], 
Array(longType, longType))
   }
 }
-
-class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
-
-  override def createListeners(conf: SparkConf, sparkUI: SparkUI): 
Seq[SparkListener] = {
-List(new SQLHistoryListener(conf, sparkUI))
-  }
-}
-
-class SQLListener(conf: SparkConf) extends SparkListener with Logging {
-
-  private val retainedExecutions = 
conf.getInt("spark.sql.ui.retainedExecutions", 1000)
-
-  private val activeExecutions = mutable.HashMap[Long, 
SQLExecutionUIData]()
-
-  // Old data in the following fields must be removed in 
"trimExecutionsIfNecessary".
-  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean 
up old data
-  private val _executionIdToData = mutable.HashMap[Long, 
SQLExecutionUIData]()
-
-  /**
-   * Maintain the relation between job id and execution id so that we can 
get the execution id in
-   * the "onJobEnd" method.
-   */
-  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
-
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, 
SQLStageMetrics]()
-
-  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
-
-  private val completedExecutions = 
mutable.ListBuffer[SQLExecutionUIData]()
-
-  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
-_executionIdToData.toMap
-  }
-
-  def jobIdToExecutionId: Map[Long, Long] = synchronized {
-_jobIdToExecutionId.toMap
-  }
-
-  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
-_stageIdToStageMetrics.toMap
-  }
-
-  private def trimExecutionsIfNecessary(
-  executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
-if (executions.size > retainedExecutions) {
-  val toRemove = math.max(retainedExecutions / 10, 1)
-  executions.take(toRemove).foreach { execution =>
-for (executionUIData <- 
_executionIdToData.remove(execution.executionId)) {
-  for (jobId <- executionUIData.jobs.keys) {
-_jobIdToExecutionId.remove(jobId)
-  }
-  for (stageId <- executionUIData.stages) {
-_stageIdToStageMetrics.remove(stageId)
-  }
-}
-  }
-  executions.trimStart(toRemove)
-}
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-val executionIdString = 
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
-if (executionIdString == null) {
-  // This is not a job created by SQL
-  return
-}
-val executionId = executionIdString.toLong
-val jobId = jobStart.jobId
-val stageIds = jobStart.stageIds
-
-synchronized {
-  activeExecutions.get(executionId).foreach { executionUIData =>
-executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
-executionUIData.stages ++= stageIds
-stageIds.foreach(stageId =>
-  _stageIdToStageMetrics(stageId) = new 
SQLStageMetrics(stageAttemptId = 0))
-_jobIdToExecutionId(jobId) = executionId
-  }
-}
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
-val jobId = jobEnd.jobId
-for (executionId <- _jobIdToExecutionId.get(jobId);
- executionUIData <- _executionIdToData.get(executionId)) {
-  jobEnd.jobResult match {
-case JobSucceeded => executionUIData.jobs(jobId) = 
JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => executionUIData.jobs(jobId) = 
JobExecutionStatus.FAILED
-  }
-  if (executionUIData.completionTime.nonEmpty && 
!executionUIData.hasRunningJobs) {
-// We are the last job of this execution, so mark the execution as 
finished. Note that
-// `onExecutionEnd` also does this, but currently that can be 
called before `onJobEnd`
-// since these are called on different threads.
-markExecutionFinished(executionId)
-  }
-}
-  }
-
-  override def onExecutorMetricsUpdate(
-  executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
-for ((taskId, stageId, stageAttemptID, accumUpdates) <- 
executorMetricsUpdate.accumUpdates) {
-  updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, 
accumUpdates, 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149767574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
--- End diff --

unused import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149767731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.lang.{Long => JLong}
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
--- End diff --

SparkListenerEvent is unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149579972
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a live 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149578181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener(
 ui: Option[SparkUI] = None)
   extends SparkListener with Logging {
 
-  // How often to flush intermediate statge of a live execution to the 
store. When replaying logs,
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
--- End diff --

err, was this supposed to be "state"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149578074
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a live 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149537039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a live 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149529181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a live 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149530162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
+  // never flush (only do the very last write).
+  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+  private var uiInitialized = false
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+if (executionIdString == null) {
+  // This is not a job created by SQL
+  return
+}
+
+val executionId = executionIdString.toLong
+val jobId = event.jobId
+val exec = getOrCreateExecution(executionId)
+
+// Record the accumulator IDs for the stages of this job, so that the 
code that keeps
+// track of the metrics knows which accumulators to look at.
+val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+event.stageIds.foreach { id =>
+  stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
+}
+
+exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+exec.stages = event.stageIds
+update(exec)
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+if (!isSQLStage(event.stageInfo.stageId)) {
+  return
+}
+
+// Reset the metrics tracking object for the new attempt.
+stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+  metrics.taskMetrics.clear()
+  metrics.attemptId = event.stageInfo.attemptId
+}
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+liveExecutions.values.foreach { exec =>
+  if (exec.jobs.contains(event.jobId)) {
+val result = event.jobResult match {
+  case JobSucceeded => JobExecutionStatus.SUCCEEDED
+  case _ => JobExecutionStatus.FAILED
+}
+exec.jobs = exec.jobs + (event.jobId -> result)
+exec.endEvents += 1
+update(exec)
+  }
+}
+  }
+
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
+  updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+}
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+if (!isSQLStage(event.stageId)) {
+  return
+}
+
+val info = event.taskInfo
+// SPARK-20342. If processing events from a live 

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149500473
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
--- End diff --

> SQLAppStatusStore.executionMetrics has logic to call the listener 
directly when the final metrics are not yet computed.

ahh, that is the part I hadn't noticed.  thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149466906
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.status.config._
 import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
--- End diff --

Actually this suite has a mix of tests for the listener and for stuff 
that's not related to the listener, which would belong in 
`SQLAppStatusListenerSuite`. My original changes broke this into two different 
suites, but I chose to postpone that to reduce the size of the diff for now 
(and also to make the diff a little easier to read).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149466466
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
--- End diff --

The live UI is updated from the info in `LiveStageMetrics`, which is not 
written to the store. That's kept in memory while executions are running, and 
aggregated into the final metrics view when the execution finishes (see 
`aggregateMetrics`).

`SQLAppStatusStore.executionMetrics` has logic to call the listener 
directly when the final metrics are not yet computed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149457439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
--- End diff --

I don't see calls to `update` in `onTaskEnd` or `onExecutorMetricsUpdate`.  
Does that mean the live UI wont' update till a stage is finished?  But after 
looking at the tests, I guess I'm wrong, it does update ... where is the update 
I'm missing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149458539
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
@@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.status.config._
 import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
--- End diff --

rename test to `SQLAppStatusListenerSuite`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19681#discussion_r149450470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+conf: SparkConf,
+kvstore: KVStore,
+live: Boolean,
+ui: Option[SparkUI] = None)
+  extends SparkListener with Logging {
+
+  // How often to flush intermediate statge of a live execution to the 
store. When replaying logs,
--- End diff --

typo: statge


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

2017-11-06 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/19681

[SPARK-20652][sql] Store SQL UI data in the new app status store.

This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.

The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).

Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).

Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.

The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.

Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-20652

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19681


commit ccd5adc1d6273b92fd6c9a0d4817451a5acb566a
Author: Marcelo Vanzin 
Date:   2017-04-06T17:00:25Z

[SPARK-20652][sql] Store SQL UI data in the new app status store.

This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.

The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).

Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).

Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.

The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.

Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org