Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/22381#discussion_r225303236
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import AppStatusSource.getCounter
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends
Gauge[Long] {
+ override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+ override implicit val metricRegistry = new MetricRegistry()
+
+ override val sourceName = "appStatus"
+
+ val jobDuration = new JobDuration(new AtomicLong(0L))
+
+ // Duration of each job in milliseconds
+ val JOB_DURATION = metricRegistry
+ .register(MetricRegistry.name("jobDuration"), jobDuration)
+
+ val FAILED_STAGES = getCounter("stages", "failedStages")
+
+ val SKIPPED_STAGES = getCounter("stages", "skippedStages")
+
+ val COMPLETED_STAGES = getCounter("stages", "completedStages")
+
+ val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
+
+ val FAILED_JOBS = getCounter("jobs", "failedJobs")
+
+ val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
+
+ val FAILED_TASKS = getCounter("tasks", "failedTasks")
+
+ val KILLED_TASKS = getCounter("tasks", "killedTasks")
+
+ val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
+
+ val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
+
+ val UNBLACKLISTED_EXECUTORS = getCounter("tasks",
"unblackListedExecutors")
+}
+
+private[spark] object AppStatusSource {
+
+ def getCounter(prefix: String, name: String)(implicit metricRegistry:
MetricRegistry): Counter = {
+ metricRegistry.counter (MetricRegistry.name (prefix, name) )
+ }
+
+ def createSource(conf: SparkConf): Option[AppStatusSource] = {
+ Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
+ .filter(identity)
+ .map(_ => new AppStatusSource())
--- End diff --
`.map { foo => blah }`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]