Github user carsonwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r158226032
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+ conf: SparkConf,
+ kvstore: KVStore,
+ live: Boolean,
+ ui: Option[SparkUI] = None)
+ extends SparkListener with Logging {
+
+ // How often to flush intermediate state of a live execution to the
store. When replaying logs,
+ // never flush (only do the very last write).
+ private val liveUpdatePeriodNs = if (live)
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+ // Live tracked data is needed by the SQL status store to calculate
metrics for in-flight
+ // executions; that means arbitrary threads may be querying these maps,
so they need to be
+ // thread-safe.
+ private val liveExecutions = new ConcurrentHashMap[Long,
LiveExecutionData]()
+ private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+ private var uiInitialized = false
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ val executionIdString =
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionIdString == null) {
+ // This is not a job created by SQL
+ return
+ }
+
+ val executionId = executionIdString.toLong
+ val jobId = event.jobId
+ val exec = getOrCreateExecution(executionId)
+
+ // Record the accumulator IDs for the stages of this job, so that the
code that keeps
+ // track of the metrics knows which accumulators to look at.
+ val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+ event.stageIds.foreach { id =>
+ stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray,
new ConcurrentHashMap()))
+ }
+
+ exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+ exec.stages = event.stageIds.toSet
--- End diff --
@vanzin , shall we add the stageIds to the existing stageIds? Otherwise we
will lose the stageIds in previous jobs?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]