Github user skonto commented on a diff in the pull request:
https://github.com/apache/spark/pull/21516#discussion_r194687575
--- Diff:
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala
---
@@ -17,25 +17,170 @@
package org.apache.spark.scheduler.cluster.mesos
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import java.util.concurrent.TimeUnit
+import java.util.Date
+import scala.collection.mutable.HashMap
+
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
+import org.apache.mesos.Protos.{TaskState => MesosTaskState}
+
+import org.apache.spark.TaskState
+import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.metrics.source.Source
private[mesos] class MesosClusterSchedulerSource(scheduler:
MesosClusterScheduler)
- extends Source {
+ extends Source with MesosSchedulerUtils {
+
+ // Submission state transitions, to derive metrics from:
+ // - submit():
+ // From: NULL
+ // To: queuedDrivers
+ // - offers/scheduleTasks():
+ // From: queuedDrivers and any pendingRetryDrivers scheduled for
retry
+ // To: launchedDrivers if success, or
+ // finishedDrivers(fail) if exception
+ // - taskStatus/statusUpdate():
+ // From: launchedDrivers
+ // To: finishedDrivers(success) if success (or fail and not
eligible to retry), or
+ // pendingRetryDrivers if failed (and eligible to retry)
+ // - pruning/retireDriver():
+ // From: finishedDrivers:
+ // To: NULL
override val sourceName: String = "mesos_cluster"
- override val metricRegistry: MetricRegistry = new MetricRegistry()
+ override val metricRegistry: MetricRegistry = new MetricRegistry
- metricRegistry.register(MetricRegistry.name("waitingDrivers"), new
Gauge[Int] {
+ // PULL METRICS:
+ // These gauge metrics are periodically polled/pulled by the metrics
system
+
+ metricRegistry.register(MetricRegistry.name("driver", "waiting"), new
Gauge[Int] {
override def getValue: Int = scheduler.getQueuedDriversSize
})
- metricRegistry.register(MetricRegistry.name("launchedDrivers"), new
Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("driver", "launched"), new
Gauge[Int] {
override def getValue: Int = scheduler.getLaunchedDriversSize
})
- metricRegistry.register(MetricRegistry.name("retryDrivers"), new
Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("driver", "retry"), new
Gauge[Int] {
override def getValue: Int = scheduler.getPendingRetryDriversSize
})
+
+ metricRegistry.register(MetricRegistry.name("driver", "finished"), new
Gauge[Int] {
+ override def getValue: Int = scheduler.getFinishedDriversSize
+ })
+
+ // PUSH METRICS:
+ // These metrics are updated directly as events occur
+
+ private val queuedCounter =
metricRegistry.counter(MetricRegistry.name("driver", "waiting_count"))
+ private val launchedCounter =
+ metricRegistry.counter(MetricRegistry.name("driver", "launched_count"))
+ private val retryCounter =
metricRegistry.counter(MetricRegistry.name("driver", "retry_count"))
+ private val exceptionCounter =
+ metricRegistry.counter(MetricRegistry.name("driver",
"exception_count"))
+ private val finishedCounter =
+ metricRegistry.counter(MetricRegistry.name("driver", "finished_count"))
+
+ // Same as finishedCounter above, except grouped by MesosTaskState.
+ private val finishedMesosStateCounters = MesosTaskState.values
+ // Avoid registering 'finished' metrics for states that aren't
considered finished:
+ .filter(state => TaskState.isFinished(mesosToTaskState(state)))
+ .map(state => (state, metricRegistry.counter(
+ MetricRegistry.name("driver", "finished_count_mesos_state",
state.name.toLowerCase))))
+ .toMap
+ private val finishedMesosUnknownStateCounter =
+ metricRegistry.counter(MetricRegistry.name("driver",
"finished_count_mesos_state", "UNKNOWN"))
+
+ // Duration from submission to FIRST launch.
+ // This omits retries since those would exaggerate the time since
original submission.
+ private val submitToFirstLaunch =
+ metricRegistry.timer(MetricRegistry.name("driver",
"submit_to_first_launch"))
+ // Duration from initial submission to an exception.
+ private val submitToException =
+ metricRegistry.timer(MetricRegistry.name("driver",
"submit_to_exception"))
+
+ // Duration from (most recent) launch to a retry.
+ private val launchToRetry =
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_retry"))
+
+ // Duration from initial submission to finished.
+ private val submitToFinish =
+ metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish"))
+ // Duration from (most recent) launch to finished.
+ private val launchToFinish =
+ metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish"))
+
+ // Same as submitToFinish and launchToFinish above, except grouped by
Spark TaskState.
+ class FinishStateTimers(state: String) {
+ val submitToFinish =
+ metricRegistry.timer(MetricRegistry.name("driver",
"submit_to_finish_state", state))
+ val launchToFinish =
+ metricRegistry.timer(MetricRegistry.name("driver",
"launch_to_finish_state", state))
+ }
+ private val finishSparkStateTimers = HashMap.empty[TaskState.TaskState,
FinishStateTimers]
+ for (state <- TaskState.values) {
+ // Avoid registering 'finished' metrics for states that aren't
considered finished:
+ if (TaskState.isFinished(state)) {
+ finishSparkStateTimers += (state -> new
FinishStateTimers(state.toString.toLowerCase))
+ }
+ }
+ private val submitToFinishUnknownState = metricRegistry.timer(
+ MetricRegistry.name("driver", "submit_to_finish_state", "UNKNOWN"))
+ private val launchToFinishUnknownState = metricRegistry.timer(
+ MetricRegistry.name("driver", "launch_to_finish_state", "UNKNOWN"))
+
+ // Histogram of retry counts at retry scheduling
+ private val retryCount =
metricRegistry.histogram(MetricRegistry.name("driver", "retry_counts"))
+
+ // Records when a submission initially enters the launch queue.
+ def recordQueuedDriver(): Unit = queuedCounter.inc
+
+ // Records when a submission has failed an attempt and is eligible to be
retried
+ def recordRetryingDriver(state: MesosClusterSubmissionState): Unit = {
+ state.driverDescription.retryState.foreach(retryState =>
retryCount.update(retryState.retries))
+ recordTimeSince(state.startDate, launchToRetry)
+ retryCounter.inc
+ }
+
+ // Records when a submission is launched.
+ def recordLaunchedDriver(desc: MesosDriverDescription): Unit = {
+ if (!desc.retryState.isDefined) {
--- End diff --
How about desc.retryState.isEmpty()?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]