Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19383#discussion_r141967159
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.scope._
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A Spark listener that writes application information to a data store.
The types written to the
+ * store are defined in the `storeTypes.scala` file and are based on the
public REST API.
+ */
+private class AppStatusListener(kvstore: KVStore) extends SparkListener
with Logging {
+
+ private var sparkVersion = SPARK_VERSION
+ private var appInfo: v1.ApplicationInfo = null
+ private var coresPerTask: Int = 1
+
+ // Keep track of live entities, so that task metrics can be efficiently
updated (without
+ // causing too many writes to the underlying store, and other expensive
operations).
+ private val liveStages = new HashMap[(Int, Int), LiveStage]()
+ private val liveJobs = new HashMap[Int, LiveJob]()
+ private val liveExecutors = new HashMap[String, LiveExecutor]()
+ private val liveTasks = new HashMap[Long, LiveTask]()
+ private val liveRDDs = new HashMap[Int, LiveRDD]()
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case SparkListenerLogStart(version) => sparkVersion = version
+ case _ =>
+ }
+
+ override def onApplicationStart(event: SparkListenerApplicationStart):
Unit = {
+ assert(event.appId.isDefined, "Application without IDs are not
supported.")
+
+ val attempt = new v1.ApplicationAttemptInfo(
+ event.appAttemptId,
+ new Date(event.time),
+ new Date(-1),
+ new Date(event.time),
+ -1L,
+ event.sparkUser,
+ false,
+ sparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ event.appId.get,
+ event.appName,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
= {
+ val old = appInfo.attempts.head
+ val attempt = new v1.ApplicationAttemptInfo(
+ old.attemptId,
+ old.startTime,
+ new Date(event.time),
+ new Date(event.time),
+ event.time - old.startTime.getTime(),
+ old.sparkUser,
+ true,
+ old.appSparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ appInfo.id,
+ appInfo.name,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+ // This needs to be an update in case an executor re-registers after
the driver has
+ // marked it as "dead".
+ val exec = getOrCreateExecutor(event.executorId)
+ exec.host = event.executorInfo.executorHost
+ exec.isActive = true
+ exec.totalCores = event.executorInfo.totalCores
+ exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+ exec.executorLogs = event.executorInfo.logUrlMap
+ update(exec)
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved):
Unit = {
+ liveExecutors.remove(event.executorId).foreach { exec =>
+ exec.isActive = false
+ update(exec)
+ }
+ }
+
+ override def onExecutorBlacklisted(event:
SparkListenerExecutorBlacklisted): Unit = {
+ updateBlackListStatus(event.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(event:
SparkListenerExecutorUnblacklisted): Unit = {
+ updateBlackListStatus(event.executorId, false)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, false)
+ }
+
+ private def updateBlackListStatus(execId: String, blacklisted: Boolean):
Unit = {
+ liveExecutors.get(execId).foreach { exec =>
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+
+ private def updateNodeBlackList(host: String, blacklisted: Boolean):
Unit = {
+ // Implicitly (un)blacklist every executor associated with the node.
+ liveExecutors.values.foreach { exec =>
+ if (exec.hostname == host) {
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+ }
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ // Compute (a potential underestimate of) the number of tasks that
will be run by this job.
+ // This may be an underestimate because the job start event references
all of the result
+ // stages' transitive stage dependencies, but some of these stages
might be skipped if their
+ // output is available from earlier runs.
+ // See https://github.com/apache/spark/pull/3009 for a more extensive
discussion.
+ val numTasks = {
+ val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
+ missingStages.map(_.numTasks).sum
+ }
+
+ val lastStageInfo = event.stageInfos.lastOption
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown
Stage Name)")
+
+ val jobGroup = Option(event.properties)
+ .flatMap { p =>
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
+
+ val job = new LiveJob(
+ event.jobId,
+ lastStageName,
+ Option(event.time).filter(_ >= 0).map(new Date(_)),
+ event.stageIds,
+ jobGroup,
+ numTasks)
+ liveJobs.put(event.jobId, job)
+ update(job)
+
+ event.stageInfos.foreach { stageInfo =>
+ // A new job submission may re-use an existing stage, so this code
needs to do an update
+ // instead of just a write.
+ val stage = getOrCreateStage(stageInfo)
+ stage.jobs = stage.jobs :+ job
+ stage.jobIds += event.jobId
+ update(stage)
+ }
+
+ }
+
+ override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+ liveJobs.remove(event.jobId).foreach { job =>
+ job.status = event.jobResult match {
+ case JobSucceeded => JobExecutionStatus.SUCCEEDED
+ case JobFailed(_) => JobExecutionStatus.FAILED
+ }
+
+ job.completionTime = if (event.time != -1) Some(new
Date(event.time)) else None
+ update(job)
+ }
+ }
+
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit
= {
+ val stage = getOrCreateStage(event.stageInfo)
+ stage.status = v1.StageStatus.ACTIVE
+
+ // Look at all active jobs to find the ones that mention this stage.
+ stage.jobs = liveJobs.values
+ .filter(_.stageIds.contains(event.stageInfo.stageId))
+ .toSeq
+ stage.jobIds = stage.jobs.map(_.jobId).toSet
+
+ stage.schedulingPool = Option(event.properties).flatMap { p =>
+ Option(p.getProperty("spark.scheduler.pool"))
+ }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
+
+ stage.jobs.foreach { job =>
+ job.completedStages = job.completedStages - event.stageInfo.stageId
+ job.activeStages += 1
+ update(job)
+ }
+
+ event.stageInfo.rddInfos.foreach { info =>
+ if (info.storageLevel.isValid) {
+ update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)))
+ }
+ }
+
+ update(stage)
+ }
+
+ override def onTaskStart(event: SparkListenerTaskStart): Unit = {
+ val task = new LiveTask(event.taskInfo, event.stageId,
event.stageAttemptId)
+ liveTasks.put(event.taskInfo.taskId, task)
+ update(task)
+
+ liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage
=>
+ stage.activeTasks += 1
+ stage.firstLaunchTime = math.min(stage.firstLaunchTime,
event.taskInfo.launchTime)
+ update(stage)
+
+ stage.jobs.foreach { job =>
+ job.activeTasks += 1
+ update(job)
+ }
+ }
+
+ liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
+ exec.activeTasks += 1
+ exec.totalTasks += 1
+ update(exec)
+ }
+ }
+
+ override def onTaskGettingResult(event: SparkListenerTaskGettingResult):
Unit = {
+ liveTasks.get(event.taskInfo.taskId).foreach { task =>
+ update(task)
+ }
+ }
+
+ override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+ // If stage attempt id is -1, it means the DAGScheduler had no idea
which attempt this task
+ // completion event is for. Let's just drop it here. This means we
might have some speculation
+ // tasks on the web ui that are never marked as complete.
+ if (event.taskInfo == null || event.stageAttemptId == -1) {
+ return
+ }
+
+ val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task
=>
+ val errorMessage = event.reason match {
+ case Success =>
+ None
+ case k: TaskKilled =>
+ Some(k.reason)
+ case e: ExceptionFailure => // Handle ExceptionFailure because we
might have accumUpdates
+ Some(e.toErrorString)
+ case e: TaskFailedReason => // All other failure cases
+ Some(e.toErrorString)
+ case other =>
+ logInfo(s"Unhandled task end reason: $other")
+ None
+ }
+ task.errorMessage = errorMessage
+ val delta = task.updateMetrics(event.taskMetrics)
+ update(task)
+ delta
+ }.orNull
+
+ val (completedDelta, failedDelta) = event.reason match {
+ case Success =>
+ (1, 0)
+ case _ =>
+ (0, 1)
+ }
+
+ liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage
=>
+ if (metricsDelta != null) {
+ stage.metrics.update(metricsDelta)
+ }
+ stage.activeTasks -= 1
+ stage.completedTasks += completedDelta
+ stage.failedTasks += failedDelta
+ update(stage)
+
+ stage.jobs.foreach { job =>
+ job.activeTasks -= 1
+ job.completedTasks += completedDelta
+ job.failedTasks += failedDelta
+ update(job)
+ }
+
+ val esummary = stage.executorSummary(event.taskInfo.executorId)
+ esummary.taskTime += event.taskInfo.duration
+ esummary.succeededTasks += completedDelta
+ esummary.failedTasks += failedDelta
+ if (metricsDelta != null) {
+ esummary.metrics.update(metricsDelta)
+ }
+ update(esummary)
+ }
+
+ liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
+ if (event.taskMetrics != null) {
+ val readMetrics = event.taskMetrics.shuffleReadMetrics
+ exec.totalGcTime += event.taskMetrics.jvmGCTime
+ exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
+ exec.totalShuffleRead += readMetrics.localBytesRead +
readMetrics.remoteBytesRead
+ exec.totalShuffleWrite +=
event.taskMetrics.shuffleWriteMetrics.bytesWritten
+ }
+
+ exec.activeTasks -= 1
+ exec.completedTasks += completedDelta
+ exec.failedTasks += failedDelta
+ exec.totalDuration += event.taskInfo.duration
+ update(exec)
+ }
+ }
+
+ override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ liveStages.remove((event.stageInfo.stageId,
event.stageInfo.attemptId)).foreach { stage =>
+ stage.info = event.stageInfo
+
+ // Because of SPARK-20205, old event logs may contain valid stages
without a submission time
+ // in their start event. In those cases, we can only detect whether
a stage was skipped by
+ // waiting until the completion event, at which point the field
would have been set.
+ val skipped = !event.stageInfo.submissionTime.isDefined
+ stage.status = event.stageInfo.failureReason match {
+ case Some(_) => v1.StageStatus.FAILED
+ case None => if (skipped) v1.StageStatus.SKIPPED else
v1.StageStatus.COMPLETE
--- End diff --
its slightly confusing that `skipped` actually doesn't indicate skipped.
maybe rename to `hasSubmissionTime` (with corresponding change in logic)? or
even include directly in match, something like
```scala
stage.status = event.stageInfo.failureReason match {
case Some(_) => v1.StageStatus.FAILED
case None if event.stageInfo.submissionTime.isDefined =>
v1.StageStatus.COMPLETE
case _ => v1.StageStatus.SKIPPED
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]