Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r149767509
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+ conf: SparkConf,
+ kvstore: KVStore,
+ live: Boolean,
+ ui: Option[SparkUI] = None)
+ extends SparkListener with Logging {
+
+ // How often to flush intermediate state of a live execution to the
store. When replaying logs,
+ // never flush (only do the very last write).
+ private val liveUpdatePeriodNs = if (live)
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+ private val liveExecutions = new HashMap[Long, LiveExecutionData]()
+ private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
+
+ private var uiInitialized = false
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ val executionIdString =
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionIdString == null) {
+ // This is not a job created by SQL
+ return
+ }
+
+ val executionId = executionIdString.toLong
+ val jobId = event.jobId
+ val exec = getOrCreateExecution(executionId)
+
+ // Record the accumulator IDs for the stages of this job, so that the
code that keeps
+ // track of the metrics knows which accumulators to look at.
+ val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+ event.stageIds.foreach { id =>
+ stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray,
new ConcurrentHashMap()))
+ }
+
+ exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+ exec.stages = event.stageIds.toSet
+ update(exec)
+ }
+
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit
= {
+ if (!isSQLStage(event.stageInfo.stageId)) {
+ return
+ }
+
+ // Reset the metrics tracking object for the new attempt.
+ stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
+ metrics.taskMetrics.clear()
+ metrics.attemptId = event.stageInfo.attemptId
+ }
+ }
+
+ override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+ liveExecutions.values.foreach { exec =>
+ if (exec.jobs.contains(event.jobId)) {
+ val result = event.jobResult match {
+ case JobSucceeded => JobExecutionStatus.SUCCEEDED
+ case _ => JobExecutionStatus.FAILED
+ }
+ exec.jobs = exec.jobs + (event.jobId -> result)
+ exec.endEvents += 1
+ update(exec)
+ }
+ }
+ }
+
+ override def onExecutorMetricsUpdate(event:
SparkListenerExecutorMetricsUpdate): Unit = {
+ event.accumUpdates.foreach { case (taskId, stageId, attemptId,
accumUpdates) =>
+ updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+ }
+ }
+
+ override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+ if (!isSQLStage(event.stageId)) {
+ return
+ }
+
+ val info = event.taskInfo
+ // SPARK-20342. If processing events from a live application, use the
task metrics info to
+ // work around a race in the DAGScheduler. The metrics info does not
contain accumulator info
+ // when reading event logs in the SHS, so we have to rely on the
accumulator in that case.
+ val accums = if (live && event.taskMetrics != null) {
+ event.taskMetrics.externalAccums.flatMap { a =>
+ // This call may fail if the accumulator is gc'ed, so account for
that.
+ try {
+ Some(a.toInfo(Some(a.value), None))
+ } catch {
+ case _: IllegalAccessError => None
+ }
+ }
+ } else {
+ info.accumulables
+ }
+ updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId,
accums,
+ info.successful)
+ }
+
+ def executionMetrics(executionId: Long): Map[Long, String] =
synchronized {
+ liveExecutions.get(executionId).map { exec =>
+ if (exec.metricsValues != null) {
+ exec.metricsValues
+ } else {
+ aggregateMetrics(exec)
+ }
+ }.getOrElse {
+ throw new NoSuchElementException(s"execution $executionId not found")
+ }
+ }
+
+ private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String]
= synchronized {
+ val metricIds = exec.metrics.map(_.accumulatorId).sorted
+ val metricTypes = exec.metrics.map { m => (m.accumulatorId,
m.metricType) }.toMap
+ val metrics = exec.stages.toSeq
+ .flatMap(stageMetrics.get)
+ .flatMap(_.taskMetrics.values().asScala)
+ .flatMap { metrics => metrics.ids.zip(metrics.values) }
+
+ (metrics ++ exec.driverAccumUpdates.toSeq)
+ .filter { case (id, _) => metricIds.contains(id) }
+ .groupBy(_._1)
+ .map { case (id, values) =>
+ id -> SQLMetrics.stringValue(metricTypes(id),
values.map(_._2).toSeq)
+ }
+ }
+
+ private def updateStageMetrics(
+ stageId: Int,
+ attemptId: Int,
+ taskId: Long,
+ accumUpdates: Seq[AccumulableInfo],
+ succeeded: Boolean): Unit = {
+ stageMetrics.get(stageId).foreach { metrics =>
+ if (metrics.attemptId != attemptId ||
metrics.accumulatorIds.isEmpty) {
+ return
+ }
+
+ val oldTaskMetrics = metrics.taskMetrics.get(taskId)
+ if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
+ return
+ }
+
+ val updates = accumUpdates
+ .filter { acc => acc.update.isDefined &&
metrics.accumulatorIds.contains(acc.id) }
+ .sortBy(_.id)
+
+ if (updates.isEmpty) {
+ return
+ }
+
+ val ids = new Array[Long](updates.size)
+ val values = new Array[Long](updates.size)
+ updates.zipWithIndex.foreach { case (acc, idx) =>
+ ids(idx) = acc.id
+ // In a live application, accumulators have Long values, but when
reading from event
+ // logs, they have String values. For now, assume all accumulators
are Long and covert
+ // accordingly.
+ values(idx) = acc.update.get match {
+ case s: String => s.toLong
+ case l: Long => l
+ case o => throw new IllegalArgumentException(s"Unexpected: $o")
+ }
+ }
+
+ metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values,
succeeded))
+ }
+ }
+
+ private def onExecutionStart(event: SparkListenerSQLExecutionStart):
Unit = {
+ // Install the SQL tab in a live app if it hasn't been initialized yet.
+ if (!uiInitialized) {
+ ui.foreach { _ui =>
+ new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
+ }
+ uiInitialized = true
+ }
+
+ val SparkListenerSQLExecutionStart(executionId, description, details,
+ physicalPlanDescription, sparkPlanInfo, time) = event
+
+ def toStoredNodes(nodes: Seq[SparkPlanGraphNode]):
Seq[SparkPlanGraphNodeWrapper] = {
+ nodes.map {
+ case cluster: SparkPlanGraphCluster =>
+ val storedCluster = new SparkPlanGraphClusterWrapper(
+ cluster.id,
+ cluster.name,
+ cluster.desc,
+ toStoredNodes(cluster.nodes),
+ cluster.metrics)
+ new SparkPlanGraphNodeWrapper(null, storedCluster)
+
+ case node =>
+ new SparkPlanGraphNodeWrapper(node, null)
+ }
+ }
+
+ val planGraph = SparkPlanGraph(sparkPlanInfo)
+ val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
+ node.metrics.map { metric => (metric.accumulatorId, metric) }
+ }.toMap.values.toList
+
+ val graphToStore = new SparkPlanGraphWrapper(
+ executionId,
+ toStoredNodes(planGraph.nodes),
+ planGraph.edges)
+ kvstore.write(graphToStore)
+
+ val exec = getOrCreateExecution(executionId)
+ exec.description = description
+ exec.details = details
+ exec.physicalPlanDescription = physicalPlanDescription
+ exec.metrics = sqlPlanMetrics
+ exec.submissionTime = time
+ update(exec)
+ }
+
+ private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ val SparkListenerSQLExecutionEnd(executionId, time) = event
+ liveExecutions.get(executionId).foreach { exec =>
+ synchronized {
+ exec.metricsValues = aggregateMetrics(exec)
+
+ // Remove stale LiveStageMetrics objects for stages that are not
active anymore.
+ val activeStages = liveExecutions.values.flatMap { other =>
+ if (other != exec) other.stages else Nil
+ }.toSet
+ stageMetrics.retain { case (id, _) => activeStages.contains(id) }
+
+ exec.completionTime = Some(new Date(time))
+ exec.endEvents += 1
+
+ update(exec)
+ }
+ }
+ }
+
+ private def onDriverAccumUpdates(event:
SparkListenerDriverAccumUpdates): Unit = {
+ val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
+ liveExecutions.get(executionId).foreach { exec =>
+ exec.driverAccumUpdates = accumUpdates.toMap
+ update(exec)
+ }
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
+ case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
+ case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
+ case _ => // Ignore
+ }
+
+ private def getOrCreateExecution(executionId: Long): LiveExecutionData =
{
+ liveExecutions.getOrElseUpdate(executionId, new
LiveExecutionData(executionId))
+ }
+
+ private def update(exec: LiveExecutionData): Unit = {
+ val now = System.nanoTime()
+ if (exec.endEvents >= exec.jobs.size + 1) {
+ liveExecutions.remove(exec.executionId)
+ exec.write(kvstore, now)
+ } else if (liveUpdatePeriodNs >= 0) {
+ if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
+ exec.write(kvstore, now)
+ }
+ }
+ }
+
+ private def isSQLStage(stageId: Int): Boolean = {
+ liveExecutions.values.exists { exec =>
+ exec.stages.contains(stageId)
+ }
+ }
+
+}
+
+private class LiveExecutionData(val executionId: Long) extends LiveEntity {
+
+ var description: String = null
+ var details: String = null
+ var physicalPlanDescription: String = null
+ var metrics = Seq[SQLPlanMetric]()
+ var submissionTime = -1L
+ var completionTime: Option[Date] = None
+
+ var jobs = Map[Int, JobExecutionStatus]()
+ var stages = Set[Int]()
+ var driverAccumUpdates = Map[Long, Long]()
+
+ var metricsValues: Map[Long, String] = null
+
+ // Just in case job end and execution end arrive out of order, keep
track of how many
+ // end events arrived so that the listener can stop tracking the
execution.
+ var endEvents = 0
+
+ override protected def doUpdate(): Any = {
+ new SQLExecutionUIData(
+ executionId,
+ description,
+ details,
+ physicalPlanDescription,
+ metrics,
+ submissionTime,
+ completionTime,
+ jobs,
+ stages,
+ metricsValues)
+ }
+
+}
+
+private class LiveStageMetrics(
+ val stageId: Int,
+ var attemptId: Int,
+ val accumulatorIds: Array[Long],
+ val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
--- End diff --
can you include a comment explaining the threading concerns in this class?
at first I didn't think the CHM or the `synchronized` were necessary since the
listener methods are just called from one thread, but then I realized the UI
will also call `executionMetrics(executionId)` (I think that is the only
reason?). Also I think there are more `synchronized` than necessary -- both
sites `aggregateMetrics` are called have already acquired the lock, so that
shouldn't need it again. though it doesn't hurt, it can be confusing if its
not clear where there lock is supposed to be aquired.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]