Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r150094936
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.function.Function
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the 
store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  // Live tracked data is needed by the SQL status store to calculate 
metrics for in-flight
    +  // executions; that means arbitrary threads may be querying these maps, 
so they need to be
    +  // thread-safe.
    +  private val liveExecutions = new ConcurrentHashMap[Long, 
LiveExecutionData]()
    +  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the 
code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values().asScala.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the 
task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for 
that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, 
accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = {
    +    Option(liveExecutions.get(executionId)).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] 
= {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, 
m.metricType) }.toMap
    +    val metrics = exec.stages.toSeq
    +      .flatMap { stageId => Option(stageMetrics.get(stageId)) }
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), 
values.map(_._2).toSeq)
    +      }
    +
    +    // Check the execution again for whether the aggregated metrics data 
has been calculated.
    +    // This can happen if the UI is requesting this data, and the 
onExecutionEnd handler is
    +    // running at the same time. The metrics calculcated for the UI can be 
innacurate in that
    +    // case, since the onExecutionEnd handler will clean up tracked stage 
metrics.
    +    if (exec.metricsValues != null) {
    +      exec.metricsValues
    +    } else {
    +      aggregatedMetrics
    +    }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    Option(stageMetrics.get(stageId)).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || 
metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when 
reading from event
    +        // logs, they have String values. For now, assume all accumulators 
are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      // TODO: storing metrics by task ID can lead to innacurate metrics 
when speculation is on.
    --- End diff --
    
    I think its more general than this.  I'd say
    
    Since we store metrics by taskID, in a way we'll double-count the stage 
metrics when there are multiple tasks for a given index -- in particular, if 
there is speculation, or if there are multiple attempts for a task.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to