Github user squito commented on a diff in the pull request:
    --- Diff: 
    @@ -0,0 +1,353 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) 
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +  private var uiInitialized = false
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString =
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +    // Record the accumulator IDs for the stages of this job, so that the 
code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds =
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
    +    }
    + = + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if ( {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    + = + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the 
task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for 
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, 
    +      info.successful)
    +  }
    +  def executionMetrics(executionId: Long): Map[Long, String] = 
synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] 
= synchronized {
    +    val metricIds =
    +    val metricTypes = { m => (m.accumulatorId, 
m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => }
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id),
    +      }
    +  }
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || 
metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains( }
    +        .sortBy(
    +      if (updates.isEmpty) {
    +        return
    +      }
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) =
    +        // In a live application, accumulators have Long values, but when 
reading from event
    +        // logs, they have String values. For now, assume all accumulators 
are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
    +    }
    +  }
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): 
Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): 
Seq[SparkPlanGraphNodeWrapper] = {
    + {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +  ,
    +  ,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    + { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +        // Remove stale LiveStageMetrics objects for stages that are not 
active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +  private def onDriverAccumUpdates(event: 
SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = 
    +    liveExecutions.getOrElseUpdate(executionId, new 
    +  }
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    +    } else if (liveUpdatePeriodNs >= 0) {
    +      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
    +        exec.write(kvstore, now)
    +      }
    +    }
    +  }
    +  private def isSQLStage(stageId: Int): Boolean = {
    +    liveExecutions.values.exists { exec =>
    +      exec.stages.contains(stageId)
    --- End diff --
    any reason not to make `exec.stages` a `Set`?  I guess it would normally be 
small so it doesn't matter, but seems like it couldn't hurt to protect against 
some strange SQL query with some huge number of stages.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to