HIVE-12205: Unify metric collection for local and remote spark client. (Chinna via Chengxiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9829f998 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9829f998 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9829f998 Branch: refs/heads/llap Commit: 9829f9985c48742a070b0f09889d8d74d24b5553 Parents: a6d9bf7 Author: chengxiang <chengxi...@apache.com> Authored: Wed Feb 17 18:36:51 2016 +0800 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Wed Feb 17 06:34:10 2016 -0800 ---------------------------------------------------------------------- .../spark/status/impl/LocalSparkJobStatus.java | 94 +++----------------- .../spark/status/impl/RemoteSparkJobStatus.java | 35 +------- .../exec/spark/status/impl/SparkJobUtils.java | 56 ++++++++++++ .../hive/spark/client/MetricsCollection.java | 2 +- 4 files changed, 73 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 3c15521..d4819d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.hive.spark.client.MetricsCollection; +import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; @@ -135,7 +137,18 @@ public class LocalSparkJobStatus implements SparkJobStatus { return null; } - Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric); + MetricsCollection metricsCollection = new MetricsCollection(); + Set<String> stageIds = jobMetric.keySet(); + for (String stageId : stageIds) { + List<TaskMetrics> taskMetrics = jobMetric.get(stageId); + for (TaskMetrics taskMetric : taskMetrics) { + Metrics metrics = new Metrics(taskMetric); + metricsCollection.addMetrics(jobId, Integer.parseInt(stageId), 0, metrics); + } + } + SparkJobUtils sparkJobUtils = new SparkJobUtils(); + Map<String, Long> flatJobMetric = sparkJobUtils.collectMetrics(metricsCollection + .getAllMetrics()); for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); } @@ -153,85 +166,6 @@ public class LocalSparkJobStatus implements SparkJobStatus { } } - private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) { - Map<String, Long> results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List<TaskMetrics> stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - } - } - } - } - - results.put("ExecutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - } - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - return results; - } - private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index d84c026..2c6818f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -125,7 +125,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus { // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - Map<String, Long> flatJobMetric = extractMetrics(metricsCollection); + SparkJobUtils sparkJobUtils = new SparkJobUtils(); + Map<String, Long> flatJobMetric = sparkJobUtils.collectMetrics(metricsCollection.getAllMetrics()); for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) { sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); } @@ -227,38 +228,6 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } } - private Map<String, Long> extractMetrics(MetricsCollection metricsCollection) { - Map<String, Long> results = new LinkedHashMap<String, Long>(); - Metrics allMetrics = metricsCollection.getAllMetrics(); - - results.put("ExecutorDeserializeTime", allMetrics.executorDeserializeTime); - results.put("ExecutorRunTime", allMetrics.executorRunTime); - results.put("ResultSize", allMetrics.resultSize); - results.put("JvmGCTime", allMetrics.jvmGCTime); - results.put("ResultSerializationTime", allMetrics.resultSerializationTime); - results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled); - results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled); - if (allMetrics.inputMetrics != null) { - results.put("BytesRead", allMetrics.inputMetrics.bytesRead); - } - if (allMetrics.shuffleReadMetrics != null) { - ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; - long rbf = shuffleReadMetrics.remoteBlocksFetched; - long lbf = shuffleReadMetrics.localBlocksFetched; - results.put("RemoteBlocksFetched", rbf); - results.put("LocalBlocksFetched", lbf); - results.put("TotalBlocksFetched", lbf + rbf); - results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime); - results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead); - } - if (allMetrics.shuffleWriteMetrics != null) { - results.put("ShuffleBytesWritten", allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put("ShuffleWriteTime", allMetrics.shuffleWriteMetrics.shuffleWriteTime); - } - - return results; - } - private static SparkJobInfo getDefaultJobInfo(final Integer jobId, final JobExecutionStatus status) { return new SparkJobInfo() { http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java new file mode 100644 index 0000000..383d76f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java @@ -0,0 +1,56 @@ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; + +public class SparkJobUtils { + + private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + private final static String RESULT_SIZE = "ResultSize"; + private final static String JVM_GC_TIME = "JvmGCTime"; + private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + private final static String DISK_BYTES_SPLIED = "DiskBytesSpilled"; + private final static String BYTES_READ = "BytesRead"; + private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + private final static String FETCH_WAIT_TIME = "FetchWaitTime"; + private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; + private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + + public Map<String, Long> collectMetrics(Metrics allMetrics) { + Map<String, Long> results = new LinkedHashMap<String, Long>(); + results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(RESULT_SIZE, allMetrics.resultSize); + results.put(JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); + results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(DISK_BYTES_SPLIED, allMetrics.diskBytesSpilled); + if (allMetrics.inputMetrics != null) { + results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); + } + if (allMetrics.shuffleReadMetrics != null) { + ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; + long rbf = shuffleReadMetrics.remoteBlocksFetched; + long lbf = shuffleReadMetrics.localBlocksFetched; + results.put(REMOTE_BLOCKS_FETCHED, rbf); + results.put(LOCAL_BLOCKS_FETCHED, lbf); + results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); + results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + } + if (allMetrics.shuffleWriteMetrics != null) { + results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); + } + return results; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 97863f8..e77aa78 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -118,7 +118,7 @@ public class MetricsCollection { } } - void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) { + public void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) { lock.writeLock().lock(); try { taskMetrics.add(new TaskInfo(jobId, stageId, taskId, metrics));