Repository: spark
Updated Branches:
  refs/heads/master 72e336997 -> f1957e116


SPARK-2134: Report metrics before application finishes

Author: Rahul Singhal <rahul.sing...@guavus.com>

Closes #1076 from rahulsinghaliitd/SPARK-2134 and squashes the following 
commits:

15f18b6 [Rahul Singhal] SPARK-2134: Report metrics before application finishes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1957e11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1957e11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1957e11

Branch: refs/heads/master
Commit: f1957e11652a537efd40771f843591a4c9341014
Parents: 72e3369
Author: Rahul Singhal <rahul.sing...@guavus.com>
Authored: Fri Aug 1 00:33:15 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Aug 1 00:33:15 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala          | 1 +
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala  | 2 ++
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala  | 1 +
 .../org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 1 +
 core/src/main/scala/org/apache/spark/executor/Executor.scala     | 4 ++++
 core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala | 4 ++++
 .../main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala   | 4 ++++
 core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala  | 4 ++++
 .../main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala  | 4 ++++
 core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala  | 2 ++
 .../scala/org/apache/spark/metrics/sink/MetricsServlet.scala     | 2 ++
 core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala     | 1 +
 .../main/scala/org/apache/spark/metrics/sink/GangliaSink.scala   | 4 ++++
 13 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b25f081..f5a0549 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -990,6 +990,7 @@ class SparkContext(config: SparkConf) extends Logging {
     val dagSchedulerCopy = dagScheduler
     dagScheduler = null
     if (dagSchedulerCopy != null) {
+      env.metricsSystem.report()
       metadataCleaner.cancel()
       cleaner.foreach(_.stop())
       dagSchedulerCopy.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 21f8667..a70ecdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -154,6 +154,8 @@ private[spark] class Master(
   }
 
   override def postStop() {
+    masterMetricsSystem.report()
+    applicationMetricsSystem.report()
     // prevent the CompleteRecovery message sending to restarted master
     if (recoveryCompletionTask != null) {
       recoveryCompletionTask.cancel()

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ce42544..fb5252d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -357,6 +357,7 @@ private[spark] class Worker(
   }
 
   override def postStop() {
+    metricsSystem.report()
     registrationRetryTimer.foreach(_.cancel())
     executors.values.foreach(_.kill())
     drivers.values.foreach(_.kill())

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 860b47e..af736de 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -88,6 +88,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     case StopExecutor =>
       logInfo("Driver commanded a shutdown")
+      executor.stop()
       context.stop(self)
       context.system.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3b69bc4..99d650a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,6 +121,10 @@ private[spark] class Executor(
     }
   }
 
+  def stop(): Unit = {
+    env.metricsSystem.report()
+  }
+
   /** Get the Yarn approved local directories. */
   private def getYarnLocalDirs(): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 651511d..6ef817d 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -91,6 +91,10 @@ private[spark] class MetricsSystem private (val instance: 
String,
     sinks.foreach(_.stop)
   }
 
+  def report(): Unit = {
+    sinks.foreach(_.report())
+  }
+
   def registerSource(source: Source) {
     sources += source
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
index 05852f1..81b9056 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -57,5 +57,9 @@ private[spark] class ConsoleSink(val property: Properties, 
val registry: MetricR
   override def stop() {
     reporter.stop()
   }
+
+  override def report() {
+    reporter.report()
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 542dce6..9d5f2ae 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -66,5 +66,9 @@ private[spark] class CsvSink(val property: Properties, val 
registry: MetricRegis
   override def stop() {
     reporter.stop()
   }
+
+  override def report() {
+    reporter.report()
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index aeb4ad4..d7b5f5c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -81,4 +81,8 @@ private[spark] class GraphiteSink(val property: Properties, 
val registry: Metric
   override def stop() {
     reporter.stop()
   }
+
+  override def report() {
+    reporter.report()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
index ed27234..2588fe2 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -35,4 +35,6 @@ private[spark] class JmxSink(val property: Properties, val 
registry: MetricRegis
     reporter.stop()
   }
 
+  override def report() { }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 571539b..2f65bc8 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -57,4 +57,6 @@ private[spark] class MetricsServlet(val property: Properties, 
val registry: Metr
   override def start() { }
 
   override def stop() { }
+
+  override def report() { }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
index 6f2b5a0..0d83d8c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
@@ -20,4 +20,5 @@ package org.apache.spark.metrics.sink
 private[spark] trait Sink {
   def start: Unit
   def stop: Unit
+  def report(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
----------------------------------------------------------------------
diff --git 
a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
 
b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
index d03d777..3b1880e 100644
--- 
a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ 
b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -82,5 +82,9 @@ class GangliaSink(val property: Properties, val registry: 
MetricRegistry,
   override def stop() {
     reporter.stop()
   }
+
+  override def report() {
+    reporter.report()
+  }
 }
 

Reply via email to