Repository: hive Updated Branches: refs/heads/master 2028749b1 -> d682ca926
HIVE-18652: Print Spark metrics on console (Sahil Takiar, reviewed by Vihang Karajgaonkar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d682ca92 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d682ca92 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d682ca92 Branch: refs/heads/master Commit: d682ca9266df182e977b35ab47771dbac27777ec Parents: 2028749 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Mon Jun 4 13:36:04 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Jun 4 13:36:48 2018 -0500 ---------------------------------------------------------------------- .../hive/ql/exec/spark/TestSparkStatistics.java | 2 +- .../hadoop/hive/ql/exec/spark/SparkTask.java | 83 +++++++++++++++++++- .../spark/Statistic/SparkStatisticGroup.java | 4 + .../spark/Statistic/SparkStatisticsNames.java | 25 ++++-- .../spark/status/impl/SparkMetricsUtils.java | 37 ++++++--- .../hive/spark/client/MetricsCollection.java | 20 ++++- .../hive/spark/client/metrics/InputMetrics.java | 12 ++- .../client/metrics/ShuffleReadMetrics.java | 21 ++++- .../client/metrics/ShuffleWriteMetrics.java | 11 ++- .../spark/client/TestMetricsCollection.java | 15 ++-- 10 files changed, 190 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java index 4413161..f6c5b17 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -81,7 +81,7 @@ public class TestSparkStatistics { List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics() .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics()); - Assert.assertEquals(18, sparkStats.size()); + Assert.assertEquals(24, sparkStats.size()); Map<String, String> statsMap = sparkStats.stream().collect( Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue)); http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 8038771..ddbb6ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -35,6 +35,7 @@ import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,6 +162,7 @@ public class SparkTask extends Task<SparkWork> { if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + printConsoleMetrics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); @@ -222,6 +224,79 @@ public class SparkTask extends Task<SparkWork> { return rc; } + private void printConsoleMetrics() { + SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( + SparkStatisticsNames.SPARK_GROUP_NAME); + + if (sparkStatisticGroup != null) { + String colon = ": "; + String forwardSlash = " / "; + String separator = ", "; + + String metricsString = String.format("Spark Job[%d] Metrics: ", sparkJobID); + + // Task Duration Time + if (sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.TASK_DURATION_TIME)) { + metricsString += SparkStatisticsNames.TASK_DURATION_TIME + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.TASK_DURATION_TIME) + separator; + } + + // Executor CPU Time + if (sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.EXECUTOR_CPU_TIME)) { + metricsString += SparkStatisticsNames.EXECUTOR_CPU_TIME + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.EXECUTOR_CPU_TIME) + separator; + } + + // JCM GC Time + if (sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.JVM_GC_TIME)) { + metricsString += SparkStatisticsNames.JVM_GC_TIME + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.JVM_GC_TIME) + separator; + } + + // Bytes Read / Records Read + if (sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.BYTES_READ) && + sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.RECORDS_READ)) { + metricsString += SparkStatisticsNames.BYTES_READ + forwardSlash + + SparkStatisticsNames.RECORDS_READ + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.BYTES_READ) + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.RECORDS_READ) + separator; + } + + // Shuffle Read Bytes / Shuffle Read Records + if (sparkStatisticGroup.containsSparkStatistic( + SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ) && + sparkStatisticGroup.containsSparkStatistic( + SparkStatisticsNames.SHUFFLE_RECORDS_READ)) { + metricsString += SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ + forwardSlash + + SparkStatisticsNames.SHUFFLE_RECORDS_READ + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ) + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_RECORDS_READ) + separator; + } + + // Shuffle Write Bytes / Shuffle Write Records + if (sparkStatisticGroup.containsSparkStatistic( + SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN) && + sparkStatisticGroup.containsSparkStatistic( + SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN)) { + metricsString += SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN + forwardSlash + + SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN + colon + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN) + forwardSlash + + SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN); + } + + console.printInfo(metricsString); + } + } + /** * Use the Spark metrics and calculate how much task executione time was spent performing GC * operations. If more than a defined threshold of time is spent, print out a warning on the @@ -231,10 +306,10 @@ public class SparkTask extends Task<SparkWork> { SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( SparkStatisticsNames.SPARK_GROUP_NAME); if (sparkStatisticGroup != null) { - long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( - SparkStatisticsNames.TASK_DURATION_TIME).getValue()); - long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( - SparkStatisticsNames.JVM_GC_TIME).getValue()); + long taskDurationTime = SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.TASK_DURATION_TIME); + long jvmGCTime = SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup, + SparkStatisticsNames.JVM_GC_TIME); // Threshold percentage to trigger the GC warning double threshold = 0.1; http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java index e1006e3..d5d628e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java @@ -47,4 +47,8 @@ public class SparkStatisticGroup { public SparkStatistic getSparkStatistic(String name) { return this.statistics.get(name); } + + public boolean containsSparkStatistic(String name) { + return this.statistics.containsKey(name); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index 68e4f9e..12c3eac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -31,15 +31,28 @@ public class SparkStatisticsNames { public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled"; public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + + public static final String TASK_DURATION_TIME = "TaskDurationTime"; + + // Input Metrics public static final String BYTES_READ = "BytesRead"; - public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - public static final String FETCH_WAIT_TIME = "FetchWaitTime"; - public static final String REMOTE_BYTES_READ = "RemoteBytesRead"; + public static final String RECORDS_READ = "RecordsRead"; + + // Shuffle Read Metrics + public static final String SHUFFLE_FETCH_WAIT_TIME = "ShuffleFetchWaitTime"; + public static final String SHUFFLE_REMOTE_BYTES_READ = "ShuffleRemoteBytesRead"; + public static final String SHUFFLE_LOCAL_BYTES_READ = "ShuffleLocalBytesRead"; + public static final String SHUFFLE_TOTAL_BYTES_READ = "ShuffleTotalBytesRead"; + public static final String SHUFFLE_REMOTE_BLOCKS_FETCHED = "ShuffleRemoteBlocksFetched"; + public static final String SHUFFLE_LOCAL_BLOCKS_FETCHED = "ShuffleLocalBlocksFetched"; + public static final String SHUFFLE_TOTAL_BLOCKS_FETCHED = "ShuffleTotalBlocksFetched"; + public static final String SHUFFLE_REMOTE_BYTES_READ_TO_DISK = "ShuffleRemoteBytesReadToDisk"; + public static final String SHUFFLE_RECORDS_READ = "ShuffleRecordsRead"; + + // Shuffle Write Metrics public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - public static final String TASK_DURATION_TIME = "TaskDurationTime"; + public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten"; public static final String SPARK_GROUP_NAME = "SPARK"; } http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index fab5422..c73c150 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -20,45 +20,58 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; -final class SparkMetricsUtils { +public final class SparkMetricsUtils { private SparkMetricsUtils(){} static Map<String, Long> collectMetrics(Metrics allMetrics) { Map<String, Long> results = new LinkedHashMap<String, Long>(); + results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); + results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); + results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME, allMetrics.executorDeserializeCpuTime); - results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); - results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); - results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); - results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead); + results.put(SparkStatisticsNames.RECORDS_READ, allMetrics.inputMetrics.recordsRead); } if (allMetrics.shuffleReadMetrics != null) { ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; long rbf = shuffleReadMetrics.remoteBlocksFetched; long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf); - results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf); - results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ, + shuffleReadMetrics.remoteBytesRead + shuffleReadMetrics.localBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BYTES_READ, shuffleReadMetrics.localBytesRead); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ_TO_DISK, shuffleReadMetrics + .remoteBytesReadToDisk); + results.put(SparkStatisticsNames.SHUFFLE_RECORDS_READ, shuffleReadMetrics.recordsRead); + results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BLOCKS_FETCHED, rbf); + results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BLOCKS_FETCHED, lbf); + results.put(SparkStatisticsNames.SHUFFLE_FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); } if (allMetrics.shuffleWriteMetrics != null) { results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN, + allMetrics.shuffleWriteMetrics.shuffleRecordsWritten); results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } return results; } + public static long getSparkStatisticAsLong(SparkStatisticGroup group, String name) { + return Long.parseLong(group.getSparkStatistic(name).getValue()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 2f3c026..a0db015 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -154,6 +154,7 @@ public class MetricsCollection { // Input metrics. boolean hasInputMetrics = false; long bytesRead = 0L; + long recordsRead = 0L; // Shuffle read metrics. boolean hasShuffleReadMetrics = false; @@ -161,10 +162,14 @@ public class MetricsCollection { int localBlocksFetched = 0; long fetchWaitTime = 0L; long remoteBytesRead = 0L; + long localBytesRead = 0L; + long remoteBytesReadToDisk = 0L; + long shuffleRecordsRead = 0L; // Shuffle write metrics. long shuffleBytesWritten = 0L; long shuffleWriteTime = 0L; + long shuffleRecordsWritten = 0L; for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; @@ -182,6 +187,7 @@ public class MetricsCollection { if (m.inputMetrics != null) { hasInputMetrics = true; bytesRead += m.inputMetrics.bytesRead; + recordsRead += m.inputMetrics.recordsRead; } if (m.shuffleReadMetrics != null) { @@ -190,17 +196,21 @@ public class MetricsCollection { localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched; fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime; remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead; + localBytesRead += m.shuffleReadMetrics.localBytesRead; + remoteBytesReadToDisk += m.shuffleReadMetrics.remoteBytesReadToDisk; + shuffleRecordsRead += m.shuffleReadMetrics.recordsRead; } if (m.shuffleWriteMetrics != null) { shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten; shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime; + shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten; } } InputMetrics inputMetrics = null; if (hasInputMetrics) { - inputMetrics = new InputMetrics(bytesRead); + inputMetrics = new InputMetrics(bytesRead, recordsRead); } ShuffleReadMetrics shuffleReadMetrics = null; @@ -209,14 +219,18 @@ public class MetricsCollection { remoteBlocksFetched, localBlocksFetched, fetchWaitTime, - remoteBytesRead); + remoteBytesRead, + localBytesRead, + remoteBytesReadToDisk, + shuffleRecordsRead); } ShuffleWriteMetrics shuffleWriteMetrics = null; if (hasShuffleReadMetrics) { shuffleWriteMetrics = new ShuffleWriteMetrics( shuffleBytesWritten, - shuffleWriteTime); + shuffleWriteTime, + shuffleRecordsWritten); } return new Metrics( http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index 6a13071..a162f48 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -28,20 +28,26 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; */ @InterfaceAudience.Private public class InputMetrics implements Serializable { + /** Total number of bytes read. */ public final long bytesRead; + /** Total number of records read. */ + public final long recordsRead; private InputMetrics() { // For Serialization only. - this(0L); + this(0L, 0L); } public InputMetrics( - long bytesRead) { + long bytesRead, + long recordsRead) { this.bytesRead = bytesRead; + this.recordsRead = recordsRead; } public InputMetrics(TaskMetrics metrics) { - this(metrics.inputMetrics().bytesRead()); + this(metrics.inputMetrics().bytesRead(), + metrics.inputMetrics().recordsRead()); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index e3d564f..ec71136 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -42,28 +42,43 @@ public class ShuffleReadMetrics implements Serializable { public final long fetchWaitTime; /** Total number of remote bytes read from the shuffle by tasks. */ public final long remoteBytesRead; + /** Shuffle data that was read from the local disk (as opposed to from a remote executor). */ + public final long localBytesRead; + /** Total number of remotes bytes read to disk from the shuffle by this task. */ + public final long remoteBytesReadToDisk; + /** Total number of records read from the shuffle by this task. */ + public final long recordsRead; private ShuffleReadMetrics() { // For Serialization only. - this(0, 0, 0L, 0L); + this(0, 0, 0L, 0L, 0L, 0L, 0L); } public ShuffleReadMetrics( long remoteBlocksFetched, long localBlocksFetched, long fetchWaitTime, - long remoteBytesRead) { + long remoteBytesRead, + long localBytesRead, + long remoteBytesReadToDisk, + long recordsRead) { this.remoteBlocksFetched = remoteBlocksFetched; this.localBlocksFetched = localBlocksFetched; this.fetchWaitTime = fetchWaitTime; this.remoteBytesRead = remoteBytesRead; + this.localBytesRead = localBytesRead; + this.remoteBytesReadToDisk = remoteBytesReadToDisk; + this.recordsRead = recordsRead; } public ShuffleReadMetrics(TaskMetrics metrics) { this(metrics.shuffleReadMetrics().remoteBlocksFetched(), metrics.shuffleReadMetrics().localBlocksFetched(), metrics.shuffleReadMetrics().fetchWaitTime(), - metrics.shuffleReadMetrics().remoteBytesRead()); + metrics.shuffleReadMetrics().remoteBytesRead(), + metrics.shuffleReadMetrics().localBytesRead(), + metrics.shuffleReadMetrics().remoteBytesReadToDisk(), + metrics.shuffleReadMetrics().recordsRead()); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index e9cf6a1..781bf53 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -33,22 +33,27 @@ public class ShuffleWriteMetrics implements Serializable { public final long shuffleBytesWritten; /** Time tasks spent blocking on writes to disk or buffer cache, in nanoseconds. */ public final long shuffleWriteTime; + /** Total number of records written to the shuffle by this task. */ + public final long shuffleRecordsWritten; private ShuffleWriteMetrics() { // For Serialization only. - this(0L, 0L); + this(0L, 0L, 0L); } public ShuffleWriteMetrics( long shuffleBytesWritten, - long shuffleWriteTime) { + long shuffleWriteTime, + long shuffleRecordsWritten) { this.shuffleBytesWritten = shuffleBytesWritten; this.shuffleWriteTime = shuffleWriteTime; + this.shuffleRecordsWritten = shuffleRecordsWritten; } public ShuffleWriteMetrics(TaskMetrics metrics) { this(metrics.shuffleWriteMetrics().shuffleBytesWritten(), - metrics.shuffleWriteMetrics().shuffleWriteTime()); + metrics.shuffleWriteMetrics().shuffleWriteTime(), + metrics.shuffleWriteMetrics().shuffleRecordsWritten()); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index c5884cf..2d4c43d 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -96,9 +96,9 @@ public class TestMetricsCollection { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value, value), null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value), null, null); + value, new InputMetrics(value, value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -110,9 +110,9 @@ public class TestMetricsCollection { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; return new Metrics(value, value, value, value, value, value, value, value, value, value, - new InputMetrics(value), - new ShuffleReadMetrics((int) value, (int) value, value, value), - new ShuffleWriteMetrics(value, value)); + new InputMetrics(value, value), + new ShuffleReadMetrics((int) value, (int) value, value, value, value, value, value), + new ShuffleWriteMetrics(value, value, value)); } /** @@ -160,14 +160,19 @@ public class TestMetricsCollection { assertEquals(expected, metrics.taskDurationTime); assertEquals(expected, metrics.inputMetrics.bytesRead); + assertEquals(expected, metrics.inputMetrics.recordsRead); assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched); assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched); assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime); assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead); + assertEquals(expected, metrics.shuffleReadMetrics.localBytesRead); + assertEquals(expected, metrics.shuffleReadMetrics.recordsRead); + assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesReadToDisk); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); + assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten); } }