Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19383#discussion_r146667371
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A Spark listener that writes application information to a data store.
The types written to the
+ * store are defined in the `storeTypes.scala` file and are based on the
public REST API.
+ */
+private class AppStatusListener(kvstore: KVStore) extends SparkListener
with Logging {
+
+ private var sparkVersion = SPARK_VERSION
+ private var appInfo: v1.ApplicationInfo = null
+ private var coresPerTask: Int = 1
+
+ // Keep track of live entities, so that task metrics can be efficiently
updated (without
+ // causing too many writes to the underlying store, and other expensive
operations).
+ private val liveStages = new HashMap[(Int, Int), LiveStage]()
+ private val liveJobs = new HashMap[Int, LiveJob]()
+ private val liveExecutors = new HashMap[String, LiveExecutor]()
+ private val liveTasks = new HashMap[Long, LiveTask]()
+ private val liveRDDs = new HashMap[Int, LiveRDD]()
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case SparkListenerLogStart(version) => sparkVersion = version
+ case _ =>
+ }
+
+ override def onApplicationStart(event: SparkListenerApplicationStart):
Unit = {
+ assert(event.appId.isDefined, "Application without IDs are not
supported.")
+
+ val attempt = new v1.ApplicationAttemptInfo(
+ event.appAttemptId,
+ new Date(event.time),
+ new Date(-1),
+ new Date(event.time),
+ -1L,
+ event.sparkUser,
+ false,
+ sparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ event.appId.get,
+ event.appName,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
= {
+ val old = appInfo.attempts.head
+ val attempt = new v1.ApplicationAttemptInfo(
+ old.attemptId,
+ old.startTime,
+ new Date(event.time),
+ new Date(event.time),
+ event.time - old.startTime.getTime(),
+ old.sparkUser,
+ true,
+ old.appSparkVersion)
+
+ appInfo = new v1.ApplicationInfo(
+ appInfo.id,
+ appInfo.name,
+ None,
+ None,
+ None,
+ None,
+ Seq(attempt))
+ kvstore.write(new ApplicationInfoWrapper(appInfo))
+ }
+
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+ // This needs to be an update in case an executor re-registers after
the driver has
+ // marked it as "dead".
+ val exec = getOrCreateExecutor(event.executorId)
+ exec.host = event.executorInfo.executorHost
+ exec.isActive = true
+ exec.totalCores = event.executorInfo.totalCores
+ exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+ exec.executorLogs = event.executorInfo.logUrlMap
+ update(exec)
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved):
Unit = {
+ liveExecutors.remove(event.executorId).foreach { exec =>
+ exec.isActive = false
+ update(exec)
+ }
+ }
+
+ override def onExecutorBlacklisted(event:
SparkListenerExecutorBlacklisted): Unit = {
+ updateBlackListStatus(event.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(event:
SparkListenerExecutorUnblacklisted): Unit = {
+ updateBlackListStatus(event.executorId, false)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted):
Unit = {
+ updateNodeBlackList(event.hostId, false)
+ }
+
+ private def updateBlackListStatus(execId: String, blacklisted: Boolean):
Unit = {
+ liveExecutors.get(execId).foreach { exec =>
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+
+ private def updateNodeBlackList(host: String, blacklisted: Boolean):
Unit = {
+ // Implicitly (un)blacklist every executor associated with the node.
+ liveExecutors.values.foreach { exec =>
+ if (exec.hostname == host) {
+ exec.isBlacklisted = blacklisted
+ update(exec)
+ }
+ }
+ }
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = {
+ // Compute (a potential over-estimate of) the number of tasks that
will be run by this job.
+ // This may be an over-estimate because the job start event references
all of the result
+ // stages' transitive stage dependencies, but some of these stages
might be skipped if their
+ // output is available from earlier runs.
+ // See https://github.com/apache/spark/pull/3009 for a more extensive
discussion.
+ val numTasks = {
+ val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
+ missingStages.map(_.numTasks).sum
+ }
+
+ val lastStageInfo = event.stageInfos.lastOption
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown
Stage Name)")
+
+ val jobGroup = Option(event.properties)
+ .flatMap { p =>
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
+
+ val job = new LiveJob(
+ event.jobId,
+ lastStageName,
+ Some(new Date(event.time)),
+ event.stageIds,
+ jobGroup,
+ numTasks)
+ liveJobs.put(event.jobId, job)
+ update(job)
+
+ val schedulingPool = Option(event.properties).flatMap { p =>
+ Option(p.getProperty("spark.scheduler.pool"))
+ }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
--- End diff --
you actually need to set the scheduling pool in `onStageSubmitted`. If it
is shared by multiple jobs, with different pools, then this will just use the
scheduling pool of the job that was submitted last, rather than the one that
actually is used when the stage is submitted. that has a handle on the
`properties` of the submitting job so shoudl be easy
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]