Repository: spark Updated Branches: refs/heads/master 444bce1c9 -> 1ffe03d9e
[SPARK-22190][CORE] Add Spark executor task metrics to Dropwizard metrics ## What changes were proposed in this pull request? This proposed patch is about making Spark executor task metrics available as Dropwizard metrics. This is intended to be of aid in monitoring Spark jobs and when drilling down on performance troubleshooting issues. ## How was this patch tested? Manually tested on a Spark cluster (see JIRA for an example screenshot). Author: LucaCanali <luca.can...@cern.ch> Closes #19426 from LucaCanali/SparkTaskMetricsDropWizard. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ffe03d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ffe03d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ffe03d9 Branch: refs/heads/master Commit: 1ffe03d9e87fb784cc8a0bae232c81c7b14deac9 Parents: 444bce1 Author: LucaCanali <luca.can...@cern.ch> Authored: Wed Nov 1 15:40:25 2017 +0100 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Nov 1 15:40:25 2017 +0100 ---------------------------------------------------------------------- .../org/apache/spark/executor/Executor.scala | 41 +++++++++++++++++ .../apache/spark/executor/ExecutorSource.scala | 48 ++++++++++++++++++++ 2 files changed, 89 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1ffe03d9/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2ecbb74..e3e555e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -406,6 +406,47 @@ private[spark] class Executor( task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) + // Expose task metrics using the Dropwizard metrics system. + // Update task metrics counters + executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime) + executorSource.METRIC_RUN_TIME.inc(task.metrics.executorRunTime) + executorSource.METRIC_JVM_GC_TIME.inc(task.metrics.jvmGCTime) + executorSource.METRIC_DESERIALIZE_TIME.inc(task.metrics.executorDeserializeTime) + executorSource.METRIC_DESERIALIZE_CPU_TIME.inc(task.metrics.executorDeserializeCpuTime) + executorSource.METRIC_RESULT_SERIALIZE_TIME.inc(task.metrics.resultSerializationTime) + executorSource.METRIC_SHUFFLE_FETCH_WAIT_TIME + .inc(task.metrics.shuffleReadMetrics.fetchWaitTime) + executorSource.METRIC_SHUFFLE_WRITE_TIME.inc(task.metrics.shuffleWriteMetrics.writeTime) + executorSource.METRIC_SHUFFLE_TOTAL_BYTES_READ + .inc(task.metrics.shuffleReadMetrics.totalBytesRead) + executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ + .inc(task.metrics.shuffleReadMetrics.remoteBytesRead) + executorSource.METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK + .inc(task.metrics.shuffleReadMetrics.remoteBytesReadToDisk) + executorSource.METRIC_SHUFFLE_LOCAL_BYTES_READ + .inc(task.metrics.shuffleReadMetrics.localBytesRead) + executorSource.METRIC_SHUFFLE_RECORDS_READ + .inc(task.metrics.shuffleReadMetrics.recordsRead) + executorSource.METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.remoteBlocksFetched) + executorSource.METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.localBlocksFetched) + executorSource.METRIC_SHUFFLE_BYTES_WRITTEN + .inc(task.metrics.shuffleWriteMetrics.bytesWritten) + executorSource.METRIC_SHUFFLE_RECORDS_WRITTEN + .inc(task.metrics.shuffleWriteMetrics.recordsWritten) + executorSource.METRIC_INPUT_BYTES_READ + .inc(task.metrics.inputMetrics.bytesRead) + executorSource.METRIC_INPUT_RECORDS_READ + .inc(task.metrics.inputMetrics.recordsRead) + executorSource.METRIC_OUTPUT_BYTES_WRITTEN + .inc(task.metrics.outputMetrics.bytesWritten) + executorSource.METRIC_OUTPUT_RECORDS_WRITTEN + .inc(task.metrics.inputMetrics.recordsRead) + executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize) + executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled) + executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled) + // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() // TODO: do not serialize value twice http://git-wip-us.apache.org/repos/asf/spark/blob/1ffe03d9/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d16f4a1..669ce63 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -72,4 +72,52 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0) registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } + + // Expose executor task metrics using the Dropwizard metrics system. + // The list is taken from TaskMetrics.scala + val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime")) + val METRIC_RUN_TIME = metricRegistry.counter(MetricRegistry.name("runTime")) + val METRIC_JVM_GC_TIME = metricRegistry.counter(MetricRegistry.name("jvmGCTime")) + val METRIC_DESERIALIZE_TIME = + metricRegistry.counter(MetricRegistry.name("deserializeTime")) + val METRIC_DESERIALIZE_CPU_TIME = + metricRegistry.counter(MetricRegistry.name("deserializeCpuTime")) + val METRIC_RESULT_SERIALIZE_TIME = + metricRegistry.counter(MetricRegistry.name("resultSerializationTime")) + val METRIC_SHUFFLE_FETCH_WAIT_TIME = + metricRegistry.counter(MetricRegistry.name("shuffleFetchWaitTime")) + val METRIC_SHUFFLE_WRITE_TIME = + metricRegistry.counter(MetricRegistry.name("shuffleWriteTime")) + val METRIC_SHUFFLE_TOTAL_BYTES_READ = + metricRegistry.counter(MetricRegistry.name("shuffleTotalBytesRead")) + val METRIC_SHUFFLE_REMOTE_BYTES_READ = + metricRegistry.counter(MetricRegistry.name("shuffleRemoteBytesRead")) + val METRIC_SHUFFLE_REMOTE_BYTES_READ_TO_DISK = + metricRegistry.counter(MetricRegistry.name("shuffleRemoteBytesReadToDisk")) + val METRIC_SHUFFLE_LOCAL_BYTES_READ = + metricRegistry.counter(MetricRegistry.name("shuffleLocalBytesRead")) + val METRIC_SHUFFLE_RECORDS_READ = + metricRegistry.counter(MetricRegistry.name("shuffleRecordsRead")) + val METRIC_SHUFFLE_REMOTE_BLOCKS_FETCHED = + metricRegistry.counter(MetricRegistry.name("shuffleRemoteBlocksFetched")) + val METRIC_SHUFFLE_LOCAL_BLOCKS_FETCHED = + metricRegistry.counter(MetricRegistry.name("shuffleLocalBlocksFetched")) + val METRIC_SHUFFLE_BYTES_WRITTEN = + metricRegistry.counter(MetricRegistry.name("shuffleBytesWritten")) + val METRIC_SHUFFLE_RECORDS_WRITTEN = + metricRegistry.counter(MetricRegistry.name("shuffleRecordsWritten")) + val METRIC_INPUT_BYTES_READ = + metricRegistry.counter(MetricRegistry.name("bytesRead")) + val METRIC_INPUT_RECORDS_READ = + metricRegistry.counter(MetricRegistry.name("recordsRead")) + val METRIC_OUTPUT_BYTES_WRITTEN = + metricRegistry.counter(MetricRegistry.name("bytesWritten")) + val METRIC_OUTPUT_RECORDS_WRITTEN = + metricRegistry.counter(MetricRegistry.name("recordsWritten")) + val METRIC_RESULT_SIZE = + metricRegistry.counter(MetricRegistry.name("resultSize")) + val METRIC_DISK_BYTES_SPILLED = + metricRegistry.counter(MetricRegistry.name("diskBytesSpilled")) + val METRIC_MEMORY_BYTES_SPILLED = + metricRegistry.counter(MetricRegistry.name("memoryBytesSpilled")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org