[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r158326122 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Live tracked data is needed by the SQL status store to calculate metrics for in-flight + // executions; that means arbitrary threads may be querying these maps, so they need to be + // thread-safe. + private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds.toSet --- End diff -- oh good catch. I can submit a fix for this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user carsonwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r158226032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Live tracked data is needed by the SQL status store to calculate metrics for in-flight + // executions; that means arbitrary threads may be querying these maps, so they need to be + // thread-safe. + private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds.toSet --- End diff -- @vanzin , shall we add the stageIds to the existing stageIds? Otherwise we will lose the stageIds in previous jobs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157331270 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - import org.apache.spark.AccumulatorSuite.makeInfo + + override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) --- End diff -- ah you are right, it's only shared in hive tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157279789 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157279430 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157278467 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - import org.apache.spark.AccumulatorSuite.makeInfo + + override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) --- End diff -- Is that true? The suite extends `SharedSQLContext` (which extends `SharedSparkSession`) and `SQLTestUtils`, all of which are traits, not objects. (Unlike `TestHive` which does force sessions to be used across suites for hive tests.) There are also other suites that modify the conf (such as `HDFSMetadataLogSuite`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157130281 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - import org.apache.spark.AccumulatorSuite.makeInfo + + override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) --- End diff -- the spark context is shared for all test suites, we should only set this conf to 0 in this suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129866 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129912 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129757 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157016366 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging { // For tests, do not enable the UI None } -// Bind the UI before starting the task scheduler to communicate -// the bound port to the cluster manager properly -_ui.foreach(_.bind()) +_ui.foreach { ui => + // Load any plugins that might want to modify the UI. + AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) --- End diff -- Let's continue the discussion on the other PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157015987 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging { // For tests, do not enable the UI None } -// Bind the UI before starting the task scheduler to communicate -// the bound port to the cluster manager properly -_ui.foreach(_.bind()) +_ui.foreach { ui => + // Load any plugins that might want to modify the UI. + AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) --- End diff -- @vanzin the live UI doesn't need a 2-step process to set up the UI, while history server needs. That's why I think they should not share one plugin interface. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157013164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +private[sql] class SQLAppStatusStore( +store: KVStore, +listener: Option[SQLAppStatusListener] = None) { + + def executionsList(): Seq[SQLExecutionUIData] = { +store.view(classOf[SQLExecutionUIData]).asScala.toSeq + } + + def execution(executionId: Long): Option[SQLExecutionUIData] = { +try { + Some(store.read(classOf[SQLExecutionUIData], executionId)) +} catch { + case _: NoSuchElementException => None +} + } + + def executionsCount(): Long = { +store.count(classOf[SQLExecutionUIData]) + } + + def executionMetrics(executionId: Long): Map[Long, String] = { +def metricsFromStore(): Option[Map[Long, String]] = { + val exec = store.read(classOf[SQLExecutionUIData], executionId) + Option(exec.metricValues) +} + +metricsFromStore() + .orElse(listener.flatMap(_.liveExecutionMetrics(executionId))) + // Try a second time in case the execution finished while this method is trying to + // get the metrics. + .orElse(metricsFromStore()) + .getOrElse(Map()) + } + + def planGraph(executionId: Long): SparkPlanGraph = { +store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() + } + +} + +/** + * An AppStatusPlugin for handling the SQL UI and listeners. + */ +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { + + override def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit = { +// For live applications, the listener is installed in [[setupUI]]. This also avoids adding +// the listener when the UI is disabled. Force installation during testing, though. +if (!live || Utils.isTesting) { + val listener = new SQLAppStatusListener(conf, store, live, None) + addListenerFn(listener) +} + } + + override def setupUI(ui: SparkUI): Unit = { --- End diff -- The calls are made in specific cases (setupListeners when setting up a listener bus, setupUI when setting up the UI, always). But this implementation has to be a little weird because we don't want the SQL UI is SQL hasn't been initialized, if we're to maintain the old behavior. I don't think the listener is installed twice here - here it's only installed for non-live applications (= SHS) and below it's only installed if there's a SparkContext (= live application). If we're ok to modify the existing behavior and always have the SQL tab, this can be simplified a lot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r156987695 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +private[sql] class SQLAppStatusStore( +store: KVStore, +listener: Option[SQLAppStatusListener] = None) { + + def executionsList(): Seq[SQLExecutionUIData] = { +store.view(classOf[SQLExecutionUIData]).asScala.toSeq + } + + def execution(executionId: Long): Option[SQLExecutionUIData] = { +try { + Some(store.read(classOf[SQLExecutionUIData], executionId)) +} catch { + case _: NoSuchElementException => None +} + } + + def executionsCount(): Long = { +store.count(classOf[SQLExecutionUIData]) + } + + def executionMetrics(executionId: Long): Map[Long, String] = { +def metricsFromStore(): Option[Map[Long, String]] = { + val exec = store.read(classOf[SQLExecutionUIData], executionId) + Option(exec.metricValues) +} + +metricsFromStore() + .orElse(listener.flatMap(_.liveExecutionMetrics(executionId))) + // Try a second time in case the execution finished while this method is trying to + // get the metrics. + .orElse(metricsFromStore()) + .getOrElse(Map()) + } + + def planGraph(executionId: Long): SparkPlanGraph = { +store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() + } + +} + +/** + * An AppStatusPlugin for handling the SQL UI and listeners. + */ +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { + + override def setupListeners( + conf: SparkConf, + store: KVStore, + addListenerFn: SparkListener => Unit, + live: Boolean): Unit = { +// For live applications, the listener is installed in [[setupUI]]. This also avoids adding +// the listener when the UI is disabled. Force installation during testing, though. +if (!live || Utils.isTesting) { + val listener = new SQLAppStatusListener(conf, store, live, None) + addListenerFn(listener) +} + } + + override def setupUI(ui: SparkUI): Unit = { --- End diff -- Do we have a clear rule about when `setupListeners` is called and when `setupUI` is called? Here we register `SQLAppStatusListener` in both `setupListeners` and `setupUI`, will we register it twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19681 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r150094936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Live tracked data is needed by the SQL status store to calculate metrics for in-flight + // executions; that means arbitrary threads may be querying these maps, so they need to be + // thread-safe. + private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() + private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds.toSet +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values().asScala.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event:
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r150091458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.KVStore + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +private[sql] class SQLAppStatusStore( +store: KVStore, +listener: Option[SQLAppStatusListener] = None) { + + def executionsList(): Seq[SQLExecutionUIData] = { +store.view(classOf[SQLExecutionUIData]).asScala.toSeq + } + + def execution(executionId: Long): Option[SQLExecutionUIData] = { +try { + Some(store.read(classOf[SQLExecutionUIData], executionId)) +} catch { + case _: NoSuchElementException => None +} + } + + def executionsCount(): Long = { +store.count(classOf[SQLExecutionUIData]) + } + + def executionMetrics(executionId: Long): Map[Long, String] = { +val exec = store.read(classOf[SQLExecutionUIData], executionId) +Option(exec.metricValues) + .orElse(listener.map(_.executionMetrics(executionId))) + .getOrElse(Map()) --- End diff -- is there a race here when the execution ends? - T1 (UI thread, calling this method): execution hasn't ended, so `exec.metricValues` is null - T2 (listener): execution ends, drops execution from `liveExecutions` - T1: `_.executionMetrics(executionId)` throws an exception because its dropped from the `liveExecutions` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149801087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds.toSet +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149791151 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared withTempPath { file => // person creates a temporary view. get the DF before listing previous execution IDs val data = person.select('name) - sparkContext.listenerBus.waitUntilEmpty(1) --- End diff -- `person.select()` doesn't generate any events. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149767509 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate state of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds.toSet +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149780801 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared withTempPath { file => // person creates a temporary view. get the DF before listing previous execution IDs val data = person.select('name) - sparkContext.listenerBus.waitUntilEmpty(1) --- End diff -- don't you still need this `waitUntilEmpty`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149778794 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends Converter[(Object, Object), (Long, typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) } } - -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { - - override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { -List(new SQLHistoryListener(conf, sparkUI)) - } -} - -class SQLListener(conf: SparkConf) extends SparkListener with Logging { - - private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) - - private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() - - // Old data in the following fields must be removed in "trimExecutionsIfNecessary". - // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data - private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]() - - /** - * Maintain the relation between job id and execution id so that we can get the execution id in - * the "onJobEnd" method. - */ - private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() - - private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]() - - def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized { -_executionIdToData.toMap - } - - def jobIdToExecutionId: Map[Long, Long] = synchronized { -_jobIdToExecutionId.toMap - } - - def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized { -_stageIdToStageMetrics.toMap - } - - private def trimExecutionsIfNecessary( - executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = { -if (executions.size > retainedExecutions) { - val toRemove = math.max(retainedExecutions / 10, 1) - executions.take(toRemove).foreach { execution => -for (executionUIData <- _executionIdToData.remove(execution.executionId)) { - for (jobId <- executionUIData.jobs.keys) { -_jobIdToExecutionId.remove(jobId) - } - for (stageId <- executionUIData.stages) { -_stageIdToStageMetrics.remove(stageId) - } -} - } - executions.trimStart(toRemove) -} - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) -if (executionIdString == null) { - // This is not a job created by SQL - return -} -val executionId = executionIdString.toLong -val jobId = jobStart.jobId -val stageIds = jobStart.stageIds - -synchronized { - activeExecutions.get(executionId).foreach { executionUIData => -executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING -executionUIData.stages ++= stageIds -stageIds.foreach(stageId => - _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0)) -_jobIdToExecutionId(jobId) = executionId - } -} - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { -val jobId = jobEnd.jobId -for (executionId <- _jobIdToExecutionId.get(jobId); - executionUIData <- _executionIdToData.get(executionId)) { - jobEnd.jobResult match { -case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED -case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED - } - if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) { -// We are the last job of this execution, so mark the execution as finished. Note that -// `onExecutionEnd` also does this, but currently that can be called before `onJobEnd` -// since these are called on different threads. -markExecutionFinished(executionId) - } -} - } - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { -for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates,
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149767574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ --- End diff -- unused import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149767731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} --- End diff -- SparkListenerEvent is unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149579972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a live
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149578181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener( ui: Option[SparkUI] = None) extends SparkListener with Logging { - // How often to flush intermediate statge of a live execution to the store. When replaying logs, + // How often to flush intermediate stage of a live execution to the store. When replaying logs, --- End diff -- err, was this supposed to be "state"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149578074 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a live
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149537039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a live
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149529181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a live
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149530162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { +val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) +if (executionIdString == null) { + // This is not a job created by SQL + return +} + +val executionId = executionIdString.toLong +val jobId = event.jobId +val exec = getOrCreateExecution(executionId) + +// Record the accumulator IDs for the stages of this job, so that the code that keeps +// track of the metrics knows which accumulators to look at. +val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList +event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) +} + +exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) +exec.stages = event.stageIds +update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +if (!isSQLStage(event.stageInfo.stageId)) { + return +} + +// Reset the metrics tracking object for the new attempt. +stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId +} + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { +liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { +val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED +} +exec.jobs = exec.jobs + (event.jobId -> result) +exec.endEvents += 1 +update(exec) + } +} + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) +} + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { +if (!isSQLStage(event.stageId)) { + return +} + +val info = event.taskInfo +// SPARK-20342. If processing events from a live
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149500473 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( --- End diff -- > SQLAppStatusStore.executionMetrics has logic to call the listener directly when the final metrics are not yet computed. ahh, that is the part I hadn't noticed. thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149466906 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { --- End diff -- Actually this suite has a mix of tests for the listener and for stuff that's not related to the listener, which would belong in `SQLAppStatusListenerSuite`. My original changes broke this into two different suites, but I chose to postpone that to reduce the size of the diff for now (and also to make the diff a little easier to read). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149466466 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( --- End diff -- The live UI is updated from the info in `LiveStageMetrics`, which is not written to the store. That's kept in memory while executions are running, and aggregated into the final metrics view when the execution finishes (see `aggregateMetrics`). `SQLAppStatusStore.executionMetrics` has logic to call the listener directly when the final metrics are not yet computed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149457439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( --- End diff -- I don't see calls to `update` in `onTaskEnd` or `onExecutorMetricsUpdate`. Does that mean the live UI wont' update till a stage is finished? But after looking at the tests, I guess I'm wrong, it does update ... where is the update I'm missing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149458539 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { --- End diff -- rename test to `SQLAppStatusListenerSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149450470 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( +conf: SparkConf, +kvstore: KVStore, +live: Boolean, +ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate statge of a live execution to the store. When replaying logs, --- End diff -- typo: statge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/19681 [SPARK-20652][sql] Store SQL UI data in the new app status store. This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly. The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-20652 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19681.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19681 commit ccd5adc1d6273b92fd6c9a0d4817451a5acb566a Author: Marcelo VanzinDate: 2017-04-06T17:00:25Z [SPARK-20652][sql] Store SQL UI data in the new app status store. This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly. The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org