HIVE-12205: Unify metric collection for local and remote spark client. (Chinna 
via Chengxiang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9829f998
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9829f998
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9829f998

Branch: refs/heads/llap
Commit: 9829f9985c48742a070b0f09889d8d74d24b5553
Parents: a6d9bf7
Author: chengxiang <chengxi...@apache.com>
Authored: Wed Feb 17 18:36:51 2016 +0800
Committer: Xuefu Zhang <xzh...@cloudera.com>
Committed: Wed Feb 17 06:34:10 2016 -0800

----------------------------------------------------------------------
 .../spark/status/impl/LocalSparkJobStatus.java  | 94 +++-----------------
 .../spark/status/impl/RemoteSparkJobStatus.java | 35 +-------
 .../exec/spark/status/impl/SparkJobUtils.java   | 56 ++++++++++++
 .../hive/spark/client/MetricsCollection.java    |  2 +-
 4 files changed, 73 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 3c15521..d4819d9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -28,6 +28,8 @@ import 
org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.hive.spark.client.MetricsCollection;
+import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
@@ -135,7 +137,18 @@ public class LocalSparkJobStatus implements SparkJobStatus 
{
       return null;
     }
 
-    Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
+    MetricsCollection metricsCollection = new MetricsCollection();
+    Set<String> stageIds = jobMetric.keySet();
+    for (String stageId : stageIds) {
+      List<TaskMetrics> taskMetrics = jobMetric.get(stageId);
+      for (TaskMetrics taskMetric : taskMetrics) {
+        Metrics metrics = new Metrics(taskMetric);
+        metricsCollection.addMetrics(jobId, Integer.parseInt(stageId), 0, 
metrics);
+      }
+    }
+    SparkJobUtils sparkJobUtils = new SparkJobUtils();
+    Map<String, Long> flatJobMetric = 
sparkJobUtils.collectMetrics(metricsCollection
+        .getAllMetrics());
     for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
       sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), 
Long.toString(entry.getValue()));
     }
@@ -153,85 +166,6 @@ public class LocalSparkJobStatus implements SparkJobStatus 
{
     }
   }
 
-  private Map<String, Long> combineJobLevelMetrics(Map<String, 
List<TaskMetrics>> jobMetric) {
-    Map<String, Long> results = Maps.newLinkedHashMap();
-
-    long executorDeserializeTime = 0;
-    long executorRunTime = 0;
-    long resultSize = 0;
-    long jvmGCTime = 0;
-    long resultSerializationTime = 0;
-    long memoryBytesSpilled = 0;
-    long diskBytesSpilled = 0;
-    long bytesRead = 0;
-    long remoteBlocksFetched = 0;
-    long localBlocksFetched = 0;
-    long fetchWaitTime = 0;
-    long remoteBytesRead = 0;
-    long shuffleBytesWritten = 0;
-    long shuffleWriteTime = 0;
-    boolean inputMetricExist = false;
-    boolean shuffleReadMetricExist = false;
-    boolean shuffleWriteMetricExist = false;
-
-    for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-      if (stageMetric != null) {
-        for (TaskMetrics taskMetrics : stageMetric) {
-          if (taskMetrics != null) {
-            executorDeserializeTime += taskMetrics.executorDeserializeTime();
-            executorRunTime += taskMetrics.executorRunTime();
-            resultSize += taskMetrics.resultSize();
-            jvmGCTime += taskMetrics.jvmGCTime();
-            resultSerializationTime += taskMetrics.resultSerializationTime();
-            memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-            diskBytesSpilled += taskMetrics.diskBytesSpilled();
-            if (!taskMetrics.inputMetrics().isEmpty()) {
-              inputMetricExist = true;
-              bytesRead += taskMetrics.inputMetrics().get().bytesRead();
-            }
-            Option<ShuffleReadMetrics> shuffleReadMetricsOption = 
taskMetrics.shuffleReadMetrics();
-            if (!shuffleReadMetricsOption.isEmpty()) {
-              shuffleReadMetricExist = true;
-              remoteBlocksFetched += 
shuffleReadMetricsOption.get().remoteBlocksFetched();
-              localBlocksFetched += 
shuffleReadMetricsOption.get().localBlocksFetched();
-              fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
-              remoteBytesRead += 
shuffleReadMetricsOption.get().remoteBytesRead();
-            }
-            Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = 
taskMetrics.shuffleWriteMetrics();
-            if (!shuffleWriteMetricsOption.isEmpty()) {
-              shuffleWriteMetricExist = true;
-              shuffleBytesWritten += 
shuffleWriteMetricsOption.get().shuffleBytesWritten();
-              shuffleWriteTime += 
shuffleWriteMetricsOption.get().shuffleWriteTime();
-            }
-          }
-        }
-      }
-    }
-
-    results.put("ExecutorDeserializeTime", executorDeserializeTime);
-    results.put("ExecutorRunTime", executorRunTime);
-    results.put("ResultSize", resultSize);
-    results.put("JvmGCTime", jvmGCTime);
-    results.put("ResultSerializationTime", resultSerializationTime);
-    results.put("MemoryBytesSpilled", memoryBytesSpilled);
-    results.put("DiskBytesSpilled", diskBytesSpilled);
-    if (inputMetricExist) {
-      results.put("BytesRead", bytesRead);
-    }
-    if (shuffleReadMetricExist) {
-      results.put("RemoteBlocksFetched", remoteBlocksFetched);
-      results.put("LocalBlocksFetched", localBlocksFetched);
-      results.put("TotalBlocksFetched", localBlocksFetched + 
remoteBlocksFetched);
-      results.put("FetchWaitTime", fetchWaitTime);
-      results.put("RemoteBytesRead", remoteBytesRead);
-    }
-    if (shuffleWriteMetricExist) {
-      results.put("ShuffleBytesWritten", shuffleBytesWritten);
-      results.put("ShuffleWriteTime", shuffleWriteTime);
-    }
-    return results;
-  }
-
   private SparkJobInfo getJobInfo() {
     return sparkContext.statusTracker().getJobInfo(jobId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index d84c026..2c6818f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -125,7 +125,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus 
{
     // add spark job metrics.
     String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] 
Metrics";
 
-    Map<String, Long> flatJobMetric = extractMetrics(metricsCollection);
+    SparkJobUtils sparkJobUtils = new SparkJobUtils();
+    Map<String, Long> flatJobMetric = 
sparkJobUtils.collectMetrics(metricsCollection.getAllMetrics());
     for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
       sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), 
Long.toString(entry.getValue()));
     }
@@ -227,38 +228,6 @@ public class RemoteSparkJobStatus implements 
SparkJobStatus {
     }
   }
 
-  private Map<String, Long> extractMetrics(MetricsCollection 
metricsCollection) {
-    Map<String, Long> results = new LinkedHashMap<String, Long>();
-    Metrics allMetrics = metricsCollection.getAllMetrics();
-
-    results.put("ExecutorDeserializeTime", allMetrics.executorDeserializeTime);
-    results.put("ExecutorRunTime", allMetrics.executorRunTime);
-    results.put("ResultSize", allMetrics.resultSize);
-    results.put("JvmGCTime", allMetrics.jvmGCTime);
-    results.put("ResultSerializationTime", allMetrics.resultSerializationTime);
-    results.put("MemoryBytesSpilled", allMetrics.memoryBytesSpilled);
-    results.put("DiskBytesSpilled", allMetrics.diskBytesSpilled);
-    if (allMetrics.inputMetrics != null) {
-      results.put("BytesRead", allMetrics.inputMetrics.bytesRead);
-    }
-    if (allMetrics.shuffleReadMetrics != null) {
-      ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
-      long rbf = shuffleReadMetrics.remoteBlocksFetched;
-      long lbf = shuffleReadMetrics.localBlocksFetched;
-      results.put("RemoteBlocksFetched", rbf);
-      results.put("LocalBlocksFetched", lbf);
-      results.put("TotalBlocksFetched", lbf + rbf);
-      results.put("FetchWaitTime", shuffleReadMetrics.fetchWaitTime);
-      results.put("RemoteBytesRead", shuffleReadMetrics.remoteBytesRead);
-    }
-    if (allMetrics.shuffleWriteMetrics != null) {
-      results.put("ShuffleBytesWritten", 
allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
-      results.put("ShuffleWriteTime", 
allMetrics.shuffleWriteMetrics.shuffleWriteTime);
-    }
-
-    return results;
-  }
-
   private static SparkJobInfo getDefaultJobInfo(final Integer jobId,
       final JobExecutionStatus status) {
     return new SparkJobInfo() {

http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java
new file mode 100644
index 0000000..383d76f
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkJobUtils.java
@@ -0,0 +1,56 @@
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
+
+public class SparkJobUtils {
+
+  private final static String EXECUTOR_DESERIALIZE_TIME = 
"ExecutorDeserializeTime";
+  private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime";
+  private final static String RESULT_SIZE = "ResultSize";
+  private final static String JVM_GC_TIME = "JvmGCTime";
+  private final static String RESULT_SERIALIZATION_TIME = 
"ResultSerializationTime";
+  private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled";
+  private final static String DISK_BYTES_SPLIED = "DiskBytesSpilled";
+  private final static String BYTES_READ = "BytesRead";
+  private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
+  private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
+  private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
+  private final static String FETCH_WAIT_TIME = "FetchWaitTime";
+  private final static String REMOTE_BYTES_READ = "RemoteBytesRead";
+  private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
+  private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
+
+  public Map<String, Long> collectMetrics(Metrics allMetrics) {
+    Map<String, Long> results = new LinkedHashMap<String, Long>();
+    results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime);
+    results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime);
+    results.put(RESULT_SIZE, allMetrics.resultSize);
+    results.put(JVM_GC_TIME, allMetrics.jvmGCTime);
+    results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime);
+    results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled);
+    results.put(DISK_BYTES_SPLIED, allMetrics.diskBytesSpilled);
+    if (allMetrics.inputMetrics != null) {
+      results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead);
+    }
+    if (allMetrics.shuffleReadMetrics != null) {
+      ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
+      long rbf = shuffleReadMetrics.remoteBlocksFetched;
+      long lbf = shuffleReadMetrics.localBlocksFetched;
+      results.put(REMOTE_BLOCKS_FETCHED, rbf);
+      results.put(LOCAL_BLOCKS_FETCHED, lbf);
+      results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf);
+      results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime);
+      results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead);
+    }
+    if (allMetrics.shuffleWriteMetrics != null) {
+      results.put(SHUFFLE_BYTES_WRITTEN, 
allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
+      results.put(SHUFFLE_WRITE_TIME, 
allMetrics.shuffleWriteMetrics.shuffleWriteTime);
+    }
+    return results;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9829f998/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 97863f8..e77aa78 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -118,7 +118,7 @@ public class MetricsCollection {
     }
   }
 
-  void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) {
+  public void addMetrics(int jobId, int stageId, long taskId, Metrics metrics) 
{
     lock.writeLock().lock();
     try {
       taskMetrics.add(new TaskInfo(jobId, stageId, taskId, metrics));

Reply via email to