Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r150094936
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
---
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Function
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{JobExecutionStatus, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric._
+import org.apache.spark.status.LiveEntity
+import org.apache.spark.status.config._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+private[sql] class SQLAppStatusListener(
+ conf: SparkConf,
+ kvstore: KVStore,
+ live: Boolean,
+ ui: Option[SparkUI] = None)
+ extends SparkListener with Logging {
+
+ // How often to flush intermediate state of a live execution to the
store. When replaying logs,
+ // never flush (only do the very last write).
+ private val liveUpdatePeriodNs = if (live)
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+ // Live tracked data is needed by the SQL status store to calculate
metrics for in-flight
+ // executions; that means arbitrary threads may be querying these maps,
so they need to be
+ // thread-safe.
+ private val liveExecutions = new ConcurrentHashMap[Long,
LiveExecutionData]()
+ private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
+
+ private var uiInitialized = false
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ val executionIdString =
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionIdString == null) {
+ // This is not a job created by SQL
+ return
+ }
+
+ val executionId = executionIdString.toLong
+ val jobId = event.jobId
+ val exec = getOrCreateExecution(executionId)
+
+ // Record the accumulator IDs for the stages of this job, so that the
code that keeps
+ // track of the metrics knows which accumulators to look at.
+ val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
+ event.stageIds.foreach { id =>
+ stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray,
new ConcurrentHashMap()))
+ }
+
+ exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
+ exec.stages = event.stageIds.toSet
+ update(exec)
+ }
+
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit
= {
+ if (!isSQLStage(event.stageInfo.stageId)) {
+ return
+ }
+
+ // Reset the metrics tracking object for the new attempt.
+ Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
+ metrics.taskMetrics.clear()
+ metrics.attemptId = event.stageInfo.attemptId
+ }
+ }
+
+ override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+ liveExecutions.values().asScala.foreach { exec =>
+ if (exec.jobs.contains(event.jobId)) {
+ val result = event.jobResult match {
+ case JobSucceeded => JobExecutionStatus.SUCCEEDED
+ case _ => JobExecutionStatus.FAILED
+ }
+ exec.jobs = exec.jobs + (event.jobId -> result)
+ exec.endEvents += 1
+ update(exec)
+ }
+ }
+ }
+
+ override def onExecutorMetricsUpdate(event:
SparkListenerExecutorMetricsUpdate): Unit = {
+ event.accumUpdates.foreach { case (taskId, stageId, attemptId,
accumUpdates) =>
+ updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
+ }
+ }
+
+ override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+ if (!isSQLStage(event.stageId)) {
+ return
+ }
+
+ val info = event.taskInfo
+ // SPARK-20342. If processing events from a live application, use the
task metrics info to
+ // work around a race in the DAGScheduler. The metrics info does not
contain accumulator info
+ // when reading event logs in the SHS, so we have to rely on the
accumulator in that case.
+ val accums = if (live && event.taskMetrics != null) {
+ event.taskMetrics.externalAccums.flatMap { a =>
+ // This call may fail if the accumulator is gc'ed, so account for
that.
+ try {
+ Some(a.toInfo(Some(a.value), None))
+ } catch {
+ case _: IllegalAccessError => None
+ }
+ }
+ } else {
+ info.accumulables
+ }
+ updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId,
accums,
+ info.successful)
+ }
+
+ def executionMetrics(executionId: Long): Map[Long, String] = {
+ Option(liveExecutions.get(executionId)).map { exec =>
+ if (exec.metricsValues != null) {
+ exec.metricsValues
+ } else {
+ aggregateMetrics(exec)
+ }
+ }.getOrElse {
+ throw new NoSuchElementException(s"execution $executionId not found")
+ }
+ }
+
+ private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String]
= {
+ val metricIds = exec.metrics.map(_.accumulatorId).sorted
+ val metricTypes = exec.metrics.map { m => (m.accumulatorId,
m.metricType) }.toMap
+ val metrics = exec.stages.toSeq
+ .flatMap { stageId => Option(stageMetrics.get(stageId)) }
+ .flatMap(_.taskMetrics.values().asScala)
+ .flatMap { metrics => metrics.ids.zip(metrics.values) }
+
+ val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
+ .filter { case (id, _) => metricIds.contains(id) }
+ .groupBy(_._1)
+ .map { case (id, values) =>
+ id -> SQLMetrics.stringValue(metricTypes(id),
values.map(_._2).toSeq)
+ }
+
+ // Check the execution again for whether the aggregated metrics data
has been calculated.
+ // This can happen if the UI is requesting this data, and the
onExecutionEnd handler is
+ // running at the same time. The metrics calculcated for the UI can be
innacurate in that
+ // case, since the onExecutionEnd handler will clean up tracked stage
metrics.
+ if (exec.metricsValues != null) {
+ exec.metricsValues
+ } else {
+ aggregatedMetrics
+ }
+ }
+
+ private def updateStageMetrics(
+ stageId: Int,
+ attemptId: Int,
+ taskId: Long,
+ accumUpdates: Seq[AccumulableInfo],
+ succeeded: Boolean): Unit = {
+ Option(stageMetrics.get(stageId)).foreach { metrics =>
+ if (metrics.attemptId != attemptId ||
metrics.accumulatorIds.isEmpty) {
+ return
+ }
+
+ val oldTaskMetrics = metrics.taskMetrics.get(taskId)
+ if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
+ return
+ }
+
+ val updates = accumUpdates
+ .filter { acc => acc.update.isDefined &&
metrics.accumulatorIds.contains(acc.id) }
+ .sortBy(_.id)
+
+ if (updates.isEmpty) {
+ return
+ }
+
+ val ids = new Array[Long](updates.size)
+ val values = new Array[Long](updates.size)
+ updates.zipWithIndex.foreach { case (acc, idx) =>
+ ids(idx) = acc.id
+ // In a live application, accumulators have Long values, but when
reading from event
+ // logs, they have String values. For now, assume all accumulators
are Long and covert
+ // accordingly.
+ values(idx) = acc.update.get match {
+ case s: String => s.toLong
+ case l: Long => l
+ case o => throw new IllegalArgumentException(s"Unexpected: $o")
+ }
+ }
+
+ // TODO: storing metrics by task ID can lead to innacurate metrics
when speculation is on.
--- End diff --
I think its more general than this. I'd say
Since we store metrics by taskID, in a way we'll double-count the stage
metrics when there are multiple tasks for a given index -- in particular, if
there is speculation, or if there are multiple attempts for a task.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]