Github user stoader commented on a diff in the pull request:
https://github.com/apache/spark/pull/19775#discussion_r167359010
--- Diff:
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext,
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+ extends Sink with Logging {
+
+ protected class Reporter(registry: MetricRegistry)
+ extends ScheduledReporter(
+ registry,
+ "prometheus-reporter",
+ MetricFilter.ALL,
+ TimeUnit.SECONDS,
+ TimeUnit.MILLISECONDS) {
+
+ val defaultSparkConf: SparkConf = new SparkConf(true)
+
+ override def report(
+ gauges: util.SortedMap[String, Gauge[_]],
+ counters: util.SortedMap[String, Counter],
+ histograms: util.SortedMap[String, Histogram],
+ meters: util.SortedMap[String, Meter],
+ timers: util.SortedMap[String, Timer]): Unit = {
+
+ // SparkEnv may become available only after metrics sink creation
thus retrieving
+ // SparkConf from spark env here and not during the
creation/initialisation of PrometheusSink.
+ val sparkConf: SparkConf =
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
+
+ val metricsNamespace: Option[String] =
sparkConf.get(METRICS_NAMESPACE)
+ val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
+ val executorId: Option[String] =
sparkConf.getOption("spark.executor.id")
+
+ logInfo(s"metricsNamespace=$metricsNamespace,
sparkAppId=$sparkAppId, " +
+ s"executorId=$executorId")
+
+ val role: String = (sparkAppId, executorId) match {
+ case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
+ case (Some(_), Some(_)) => "executor"
+ case _ => "shuffle"
+ }
+
+ val job: String = role match {
+ case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+ case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+ case _ => metricsNamespace.getOrElse("shuffle")
+ }
+ logInfo(s"role=$role, job=$job")
+
+ val groupingKey: Map[String, String] = (role, executorId) match {
+ case ("driver", _) => Map("role" -> role)
+ case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
+ case _ => Map("role" -> role)
+ }
+
+
--- End diff --
Empty line removed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]