Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19383#discussion_r141962883
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.scope._
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A Spark listener that writes application information to a data store.
The types written to the
+ * store are defined in the `storeTypes.scala` file and are based on the
public REST API.
+ */
+private class AppStatusListener(kvstore: KVStore) extends SparkListener
with Logging {
+
+ private var sparkVersion = SPARK_VERSION
+ private var appInfo: v1.ApplicationInfo = null
+ private var coresPerTask: Int = 1
+
+ // Keep track of live entities, so that task metrics can be efficiently
updated (without
+ // causing too many writes to the underlying store, and other expensive
operations).
+ private val liveStages = new HashMap[(Int, Int), LiveStage]()
+ private val liveJobs = new HashMap[Int, LiveJob]()
+ private val liveExecutors = new HashMap[String, LiveExecutor]()
+ private val liveTasks = new HashMap[Long, LiveTask]()
+ private val liveRDDs = new HashMap[Int, LiveRDD]()
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case SparkListenerLogStart(version) => sparkVersion = version
+ case _ =>
+ }
+
+ override def onApplicationStart(event: SparkListenerApplicationStart):
Unit = {
+ assert(event.appId.isDefined, "Application without IDs are not
supported.")
+
+ val attempt = new v1.ApplicationAttemptInfo(
+ event.appAttemptId,
+ new Date(event.time),
+ new Date(-1),
+ new Date(event.time),
+ -1L,
+ event.sparkUser,
+ false,
+ sparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ event.appId.get,
+ event.appName,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
= {
+ val old = appInfo.attempts.head
+ val attempt = new v1.ApplicationAttemptInfo(
+ old.attemptId,
+ old.startTime,
+ new Date(event.time),
+ new Date(event.time),
+ event.time - old.startTime.getTime(),
+ old.sparkUser,
+ true,
+ old.appSparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ appInfo.id,
+ appInfo.name,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+ // This needs to be an update in case an executor re-registers after
the driver has
+ // marked it as "dead".
+ val exec = getOrCreateExecutor(event.executorId)
+ exec.host = event.executorInfo.executorHost
+ exec.isActive = true
+ exec.totalCores = event.executorInfo.totalCores
+ exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+ exec.executorLogs = event.executorInfo.logUrlMap
+ update(exec)
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved):
Unit = {
+ liveExecutors.remove(event.executorId).foreach { exec =>
+ exec.isActive = false
+ update(exec)
+ }
+ }
+
+ override def onExecutorBlacklisted(event:
SparkListenerExecutorBlacklisted): Unit = {
+ updateBlackListStatus(event.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(event:
SparkListenerExecutorUnblacklisted): Unit = {
+ updateBlackListStatus(event.executorId, false)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, false)
+ }
+
+ private def updateBlackListStatus(execId: String, blacklisted: Boolean):
Unit = {
+ liveExecutors.get(execId).foreach { exec =>
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+
+ private def updateNodeBlackList(host: String, blacklisted: Boolean):
Unit = {
+ // Implicitly (un)blacklist every executor associated with the node.
+ liveExecutors.values.foreach { exec =>
+ if (exec.hostname == host) {
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+ }
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ // Compute (a potential underestimate of) the number of tasks that
will be run by this job.
+ // This may be an underestimate because the job start event references
all of the result
+ // stages' transitive stage dependencies, but some of these stages
might be skipped if their
+ // output is available from earlier runs.
+ // See https://github.com/apache/spark/pull/3009 for a more extensive
discussion.
+ val numTasks = {
+ val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
+ missingStages.map(_.numTasks).sum
+ }
+
+ val lastStageInfo = event.stageInfos.lastOption
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown
Stage Name)")
+
+ val jobGroup = Option(event.properties)
+ .flatMap { p =>
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
+
+ val job = new LiveJob(
+ event.jobId,
+ lastStageName,
+ Option(event.time).filter(_ >= 0).map(new Date(_)),
+ event.stageIds,
+ jobGroup,
+ numTasks)
+ liveJobs.put(event.jobId, job)
+ update(job)
+
+ event.stageInfos.foreach { stageInfo =>
+ // A new job submission may re-use an existing stage, so this code
needs to do an update
+ // instead of just a write.
+ val stage = getOrCreateStage(stageInfo)
+ stage.jobs = stage.jobs :+ job
--- End diff --
`stage.jobs :+= job`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]