Repository: hive Updated Branches: refs/heads/master 2f802e908 -> eb40ea57e
HIVE-18651: Expose additional Spark metrics (Sahil Takiar, reviewed by Vihang Karajgaonkar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb40ea57 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb40ea57 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb40ea57 Branch: refs/heads/master Commit: eb40ea57eac4c3ff46f638cf4ab83bec71b5eda5 Parents: 2f802e9 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Fri Apr 6 11:42:23 2018 +0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Fri Apr 6 11:42:23 2018 +0700 ---------------------------------------------------------------------- .../hive/ql/exec/spark/TestSparkStatistics.java | 100 +++++++++++++++++++ .../hadoop/hive/ql/exec/spark/SparkTask.java | 18 +++- .../spark/Statistic/SparkStatisticsNames.java | 4 +- .../spark/status/impl/SparkMetricsUtils.java | 5 +- .../hive/ql/exec/spark/TestSparkTask.java | 14 +++ .../hive/spark/client/MetricsCollection.java | 6 ++ .../hive/spark/client/metrics/Metrics.java | 13 ++- .../spark/client/TestMetricsCollection.java | 17 ++-- 8 files changed, 162 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java new file mode 100644 index 0000000..be3b501 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TestSparkStatistics { + + @Test + public void testSparkStatistics() { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local-cluster[1,2,1024]"); + + SessionState.start(conf); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + Assert.assertEquals(0, driver.compile("select * from test order by col")); + + List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks()); + Assert.assertEquals(1, sparkTasks.size()); + + SparkTask sparkTask = sparkTasks.get(0); + + DriverContext driverCxt = new DriverContext(driver.getContext()); + driverCxt.prepare(driver.getPlan()); + + sparkTask.initialize(driver.getQueryState(), driver.getPlan(), driverCxt, driver.getContext() + .getOpContext()); + Assert.assertEquals(0, sparkTask.execute(driverCxt)); + + Assert.assertNotNull(sparkTask.getSparkStatistics()); + + List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics() + .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics()); + + Assert.assertEquals(18, sparkStats.size()); + + Map<String, String> statsMap = sparkStats.stream().collect( + Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue)); + + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.TASK_DURATION_TIME)) > 0); + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_CPU_TIME)) > 0); + Assert.assertTrue( + Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME)) > 0); + Assert.assertTrue( + Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME)) > 0); + Assert.assertTrue(Long.parseLong(statsMap.get(SparkStatisticsNames.EXECUTOR_RUN_TIME)) > 0); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index c240884..3083e30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; @@ -158,8 +159,7 @@ public class SparkTask extends Task<SparkWork> { sparkStatistics = sparkJobStatus.getSparkStatistics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID)); - logSparkStatistic(sparkStatistics); + LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); } LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " + jobID + " and task ID " + getId()); @@ -250,17 +250,25 @@ public class SparkTask extends Task<SparkWork> { } } - private void logSparkStatistic(SparkStatistics sparkStatistic) { + @VisibleForTesting + static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJobID) { + StringBuilder sparkStatsString = new StringBuilder(); + sparkStatsString.append("\n\n"); + sparkStatsString.append(String.format("=====Spark Job[%d] Statistics=====", sparkJobID)); + sparkStatsString.append("\n\n"); + Iterator<SparkStatisticGroup> groupIterator = sparkStatistic.getStatisticGroups(); while (groupIterator.hasNext()) { SparkStatisticGroup group = groupIterator.next(); - LOG.info(group.getGroupName()); + sparkStatsString.append(group.getGroupName()).append("\n"); Iterator<SparkStatistic> statisticIterator = group.getStatistics(); while (statisticIterator.hasNext()) { SparkStatistic statistic = statisticIterator.next(); - LOG.info("\t" + statistic.getName() + ": " + statistic.getValue()); + sparkStatsString.append("\t").append(statistic.getName()).append(": ").append( + statistic.getValue()).append("\n"); } } + return sparkStatsString.toString(); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index ca93a80..68e4f9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -23,11 +23,13 @@ package org.apache.hadoop.hive.ql.exec.spark.Statistic; public class SparkStatisticsNames { public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + public static final String EXECUTOR_DESERIALIZE_CPU_TIME = "ExecutorDeserializeCpuTime"; public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + public static final String EXECUTOR_CPU_TIME = "ExecutorCpuTime"; public static final String RESULT_SIZE = "ResultSize"; public static final String JVM_GC_TIME = "JvmGCTime"; public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled"; public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; public static final String BYTES_READ = "BytesRead"; public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index f72407e..fab5422 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -31,11 +31,14 @@ final class SparkMetricsUtils { static Map<String, Long> collectMetrics(Metrics allMetrics) { Map<String, Long> results = new LinkedHashMap<String, Long>(); results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME, + allMetrics.executorDeserializeCpuTime); results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, allMetrics.executorCpuTime); results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, allMetrics.memoryBytesSpilled); results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 435c6b6..75b4151 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -96,6 +97,19 @@ public class TestSparkTask { Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); } + @Test + public void testSparkStatisticsToString() { + SparkStatisticsBuilder statsBuilder = new SparkStatisticsBuilder(); + statsBuilder.add("TEST", "stat1", "1"); + statsBuilder.add("TEST", "stat2", "1"); + String statsString = SparkTask.sparkStatisticsToString(statsBuilder.build(), 10); + + Assert.assertTrue(statsString.contains("10")); + Assert.assertTrue(statsString.contains("TEST")); + Assert.assertTrue(statsString.contains("stat1")); + Assert.assertTrue(statsString.contains("stat2")); + Assert.assertTrue(statsString.contains("1")); + } private boolean isEmptySparkWork(SparkWork sparkWork) { List<BaseWork> allWorks = sparkWork.getAllWork(); http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 526aefd..2f3c026 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -141,7 +141,9 @@ public class MetricsCollection { try { // Task metrics. long executorDeserializeTime = 0L; + long executorDeserializeCpuTime = 0L; long executorRunTime = 0L; + long executorCpuTime = 0L; long resultSize = 0L; long jvmGCTime = 0L; long resultSerializationTime = 0L; @@ -167,7 +169,9 @@ public class MetricsCollection { for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; executorDeserializeTime += m.executorDeserializeTime; + executorDeserializeCpuTime += m.executorDeserializeCpuTime; executorRunTime += m.executorRunTime; + executorCpuTime += m.executorCpuTime; resultSize += m.resultSize; jvmGCTime += m.jvmGCTime; resultSerializationTime += m.resultSerializationTime; @@ -217,7 +221,9 @@ public class MetricsCollection { return new Metrics( executorDeserializeTime, + executorDeserializeCpuTime, executorRunTime, + executorCpuTime, resultSize, jvmGCTime, resultSerializationTime, http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 9da0116..b718b3b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client.metrics; import java.io.Serializable; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -35,8 +36,12 @@ public class Metrics implements Serializable { /** Time taken on the executor to deserialize tasks. */ public final long executorDeserializeTime; + /** CPU time taken on the executor to deserialize tasks. */ + public final long executorDeserializeCpuTime; /** Time the executor spends actually running the task (including fetching shuffle data). */ public final long executorRunTime; + /** CPU time the executor spends running the task (including fetching shuffle data). */ + public final long executorCpuTime; /** The number of bytes sent back to the driver by tasks. */ public final long resultSize; /** Amount of time the JVM spent in garbage collection while executing tasks. */ @@ -61,12 +66,14 @@ public class Metrics implements Serializable { private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); } public Metrics( long executorDeserializeTime, + long executorDeserializeCpuTime, long executorRunTime, + long executorCpuTime, long resultSize, long jvmGCTime, long resultSerializationTime, @@ -77,7 +84,9 @@ public class Metrics implements Serializable { ShuffleReadMetrics shuffleReadMetrics, ShuffleWriteMetrics shuffleWriteMetrics) { this.executorDeserializeTime = executorDeserializeTime; + this.executorDeserializeCpuTime = executorDeserializeCpuTime; this.executorRunTime = executorRunTime; + this.executorCpuTime = executorCpuTime; this.resultSize = resultSize; this.jvmGCTime = jvmGCTime; this.resultSerializationTime = resultSerializationTime; @@ -92,7 +101,9 @@ public class Metrics implements Serializable { public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), + TimeUnit.NANOSECONDS.toMillis(metrics.executorDeserializeCpuTime()), metrics.executorRunTime(), + TimeUnit.NANOSECONDS.toMillis(metrics.executorCpuTime()), metrics.resultSize(), metrics.jvmGCTime(), metrics.resultSerializationTime(), http://git-wip-us.apache.org/repos/asf/hive/blob/eb40ea57/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 87b460d..c5884cf 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -66,8 +66,8 @@ public class TestMetricsCollection { @Test public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); - Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, - null, null, null); + Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value, + value, null, null, null); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -94,10 +94,11 @@ public class TestMetricsCollection { MetricsCollection collection = new MetricsCollection(); long value = taskValue(1, 1, 1); - Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, - new InputMetrics(value), null, null); - Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + + Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, + value, new InputMetrics(value), null, null); + Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, + value, new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -108,7 +109,7 @@ public class TestMetricsCollection { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; - return new Metrics(value, value, value, value, value, value, value, value, + return new Metrics(value, value, value, value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); @@ -148,7 +149,9 @@ public class TestMetricsCollection { private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.executorDeserializeTime); + assertEquals(expected, metrics.executorDeserializeCpuTime); assertEquals(expected, metrics.executorRunTime); + assertEquals(expected, metrics.executorCpuTime); assertEquals(expected, metrics.resultSize); assertEquals(expected, metrics.jvmGCTime); assertEquals(expected, metrics.resultSerializationTime);