Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19383#discussion_r141962046
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.scope._
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A Spark listener that writes application information to a data store.
The types written to the
+ * store are defined in the `storeTypes.scala` file and are based on the
public REST API.
+ */
+private class AppStatusListener(kvstore: KVStore) extends SparkListener
with Logging {
+
+ private var sparkVersion = SPARK_VERSION
+ private var appInfo: v1.ApplicationInfo = null
+ private var coresPerTask: Int = 1
+
+ // Keep track of live entities, so that task metrics can be efficiently
updated (without
+ // causing too many writes to the underlying store, and other expensive
operations).
+ private val liveStages = new HashMap[(Int, Int), LiveStage]()
+ private val liveJobs = new HashMap[Int, LiveJob]()
+ private val liveExecutors = new HashMap[String, LiveExecutor]()
+ private val liveTasks = new HashMap[Long, LiveTask]()
+ private val liveRDDs = new HashMap[Int, LiveRDD]()
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case SparkListenerLogStart(version) => sparkVersion = version
+ case _ =>
+ }
+
+ override def onApplicationStart(event: SparkListenerApplicationStart):
Unit = {
+ assert(event.appId.isDefined, "Application without IDs are not
supported.")
+
+ val attempt = new v1.ApplicationAttemptInfo(
+ event.appAttemptId,
+ new Date(event.time),
+ new Date(-1),
+ new Date(event.time),
+ -1L,
+ event.sparkUser,
+ false,
+ sparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ event.appId.get,
+ event.appName,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
= {
+ val old = appInfo.attempts.head
+ val attempt = new v1.ApplicationAttemptInfo(
+ old.attemptId,
+ old.startTime,
+ new Date(event.time),
+ new Date(event.time),
+ event.time - old.startTime.getTime(),
+ old.sparkUser,
+ true,
+ old.appSparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ appInfo.id,
+ appInfo.name,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+ // This needs to be an update in case an executor re-registers after
the driver has
+ // marked it as "dead".
+ val exec = getOrCreateExecutor(event.executorId)
+ exec.host = event.executorInfo.executorHost
+ exec.isActive = true
+ exec.totalCores = event.executorInfo.totalCores
+ exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+ exec.executorLogs = event.executorInfo.logUrlMap
+ update(exec)
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved):
Unit = {
+ liveExecutors.remove(event.executorId).foreach { exec =>
+ exec.isActive = false
+ update(exec)
+ }
+ }
+
+ override def onExecutorBlacklisted(event:
SparkListenerExecutorBlacklisted): Unit = {
+ updateBlackListStatus(event.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(event:
SparkListenerExecutorUnblacklisted): Unit = {
+ updateBlackListStatus(event.executorId, false)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, false)
+ }
+
+ private def updateBlackListStatus(execId: String, blacklisted: Boolean):
Unit = {
+ liveExecutors.get(execId).foreach { exec =>
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+
+ private def updateNodeBlackList(host: String, blacklisted: Boolean):
Unit = {
+ // Implicitly (un)blacklist every executor associated with the node.
+ liveExecutors.values.foreach { exec =>
+ if (exec.hostname == host) {
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+ }
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ // Compute (a potential underestimate of) the number of tasks that
will be run by this job.
--- End diff --
I realize you are copying this comment, but it seems wrong. Its a
potential under-estimate of the job-progress. Its a potential *over*-estimate
of the number of tasks that will be run. I looked at the referenced PR, and I
think it agrees with that understanding -- the pr description says "If a job
contains stages that aren't run, then its overall job progress bar may be an
underestimate of the total job progress"
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]