Repository: spark
Updated Branches:
  refs/heads/master 444bce1c9 -> 1ffe03d9e


[SPARK-22190][CORE] Add Spark executor task metrics to Dropwizard metrics

## What changes were proposed in this pull request?

This proposed patch is about making Spark executor task metrics available as 
Dropwizard metrics. This is intended to be of aid in monitoring Spark jobs and 
when drilling down on performance troubleshooting issues.

## How was this patch tested?

Manually tested on a Spark cluster (see JIRA for an example screenshot).

Author: LucaCanali <luca.can...@cern.ch>

Closes #19426 from LucaCanali/SparkTaskMetricsDropWizard.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ffe03d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ffe03d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ffe03d9

Branch: refs/heads/master
Commit: 1ffe03d9e87fb784cc8a0bae232c81c7b14deac9
Parents: 444bce1
Author: LucaCanali <luca.can...@cern.ch>
Authored: Wed Nov 1 15:40:25 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Nov 1 15:40:25 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 41 +++++++++++++++++
 .../apache/spark/executor/ExecutorSource.scala  | 48 ++++++++++++++++++++
 2 files changed, 89 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ffe03d9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2ecbb74..e3e555e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -406,6 +406,47 @@ private[spark] class Executor(
         task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
         task.metrics.setResultSerializationTime(afterSerialization - 
beforeSerialization)
 
+        // Expose task metrics using the Dropwizard metrics system.
+        // Update task metrics counters
+        executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
+        executorSource.METRIC_RUN_TIME.inc(task.metrics.executorRunTime)
+        executorSource.METRIC_JVM_GC_TIME.inc(task.metrics.jvmGCTime)
+        
executorSource.METRIC_DESERIALIZE_TIME.inc(task.metrics.executorDeserializeTime)
+        
executorSource.METRIC_DESERIALIZE_CPU_TIME.inc(task.metrics.executorDeserializeCpuTime)
+        
executorSource.METRIC_RESULT_SERIALIZE_TIME.inc(task.metrics.resultSerializationTime)
+        executorSource.METRIC_SHUFFLE_FETCH_WAIT_TIME
+          .inc(task.metrics.shuffleReadMetrics.fetchWaitTime)
+        
executorSource.METRIC_SHUFFLE_WRITE_TIME.inc(task.metrics.shuffleWriteMetrics.writeTime)
+        executorSource.METRIC_SHUFFLE_TOTAL_BYTES_READ
+          .inc(task.metrics.shuffleReadMetrics.totalBytesRead)
+        executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ
+          .inc(task.metrics.shuffleReadMetrics.remoteBytesRead)
+        executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK
+          .inc(task.metrics.shuffleReadMetrics.remoteBytesReadToDisk)
+        executorSource.METRIC_SHUFFLE_LOCAL_BYTES_READ
+          .inc(task.metrics.shuffleReadMetrics.localBytesRead)
+        executorSource.METRIC_SHUFFLE_RECORDS_READ
+          .inc(task.metrics.shuffleReadMetrics.recordsRead)
+        executorSource.METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.remoteBlocksFetched)
+        executorSource.METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED
+          .inc(task.metrics.shuffleReadMetrics.localBlocksFetched)
+        executorSource.METRIC_SHUFFLE_BYTES_WRITTEN
+          .inc(task.metrics.shuffleWriteMetrics.bytesWritten)
+        executorSource.METRIC_SHUFFLE_RECORDS_WRITTEN
+          .inc(task.metrics.shuffleWriteMetrics.recordsWritten)
+        executorSource.METRIC_INPUT_BYTES_READ
+          .inc(task.metrics.inputMetrics.bytesRead)
+        executorSource.METRIC_INPUT_RECORDS_READ
+          .inc(task.metrics.inputMetrics.recordsRead)
+        executorSource.METRIC_OUTPUT_BYTES_WRITTEN
+          .inc(task.metrics.outputMetrics.bytesWritten)
+        executorSource.METRIC_OUTPUT_RECORDS_WRITTEN
+          .inc(task.metrics.inputMetrics.recordsRead)
+        executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
+        
executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
+        
executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
+
         // Note: accumulator updates must be collected after TaskMetrics is 
updated
         val accumUpdates = task.collectAccumulatorUpdates()
         // TODO: do not serialize value twice

http://git-wip-us.apache.org/repos/asf/spark/blob/1ffe03d9/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index d16f4a1..669ce63 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -72,4 +72,52 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, 
executorId: String) extends
     registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
     registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
   }
+
+  // Expose executor task metrics using the Dropwizard metrics system.
+  // The list is taken from TaskMetrics.scala
+  val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
+  val METRIC_RUN_TIME = metricRegistry.counter(MetricRegistry.name("runTime"))
+  val METRIC_JVM_GC_TIME = 
metricRegistry.counter(MetricRegistry.name("jvmGCTime"))
+  val METRIC_DESERIALIZE_TIME =
+    metricRegistry.counter(MetricRegistry.name("deserializeTime"))
+  val METRIC_DESERIALIZE_CPU_TIME =
+    metricRegistry.counter(MetricRegistry.name("deserializeCpuTime"))
+  val METRIC_RESULT_SERIALIZE_TIME =
+    metricRegistry.counter(MetricRegistry.name("resultSerializationTime"))
+  val METRIC_SHUFFLE_FETCH_WAIT_TIME =
+    metricRegistry.counter(MetricRegistry.name("shuffleFetchWaitTime"))
+  val METRIC_SHUFFLE_WRITE_TIME =
+    metricRegistry.counter(MetricRegistry.name("shuffleWriteTime"))
+  val METRIC_SHUFFLE_TOTAL_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleTotalBytesRead"))
+  val METRIC_SHUFFLE_REMOTE_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleRemoteBytesRead"))
+  val METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK =
+    metricRegistry.counter(MetricRegistry.name("shuffleRemoteBytesReadToDisk"))
+  val METRIC_SHUFFLE_LOCAL_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleLocalBytesRead"))
+  val METRIC_SHUFFLE_RECORDS_READ =
+    metricRegistry.counter(MetricRegistry.name("shuffleRecordsRead"))
+  val METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleRemoteBlocksFetched"))
+  val METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED =
+    metricRegistry.counter(MetricRegistry.name("shuffleLocalBlocksFetched"))
+  val METRIC_SHUFFLE_BYTES_WRITTEN =
+    metricRegistry.counter(MetricRegistry.name("shuffleBytesWritten"))
+  val METRIC_SHUFFLE_RECORDS_WRITTEN =
+    metricRegistry.counter(MetricRegistry.name("shuffleRecordsWritten"))
+  val METRIC_INPUT_BYTES_READ =
+    metricRegistry.counter(MetricRegistry.name("bytesRead"))
+  val METRIC_INPUT_RECORDS_READ =
+    metricRegistry.counter(MetricRegistry.name("recordsRead"))
+  val METRIC_OUTPUT_BYTES_WRITTEN =
+    metricRegistry.counter(MetricRegistry.name("bytesWritten"))
+  val METRIC_OUTPUT_RECORDS_WRITTEN =
+    metricRegistry.counter(MetricRegistry.name("recordsWritten"))
+  val METRIC_RESULT_SIZE =
+    metricRegistry.counter(MetricRegistry.name("resultSize"))
+  val METRIC_DISK_BYTES_SPILLED =
+    metricRegistry.counter(MetricRegistry.name("diskBytesSpilled"))
+  val METRIC_MEMORY_BYTES_SPILLED =
+    metricRegistry.counter(MetricRegistry.name("memoryBytesSpilled"))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to