Repository: hive Updated Branches: refs/heads/master 9cdc08580 -> 57a1ec211
HIVE-18034: Improving logging with HoS executors spend lots of time in GC (Sahil Takiar, reviewed by Rui Li) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57a1ec21 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57a1ec21 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57a1ec21 Branch: refs/heads/master Commit: 57a1ec211039d5a5c0eb309adb991283b112520e Parents: 9cdc085 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Wed Mar 14 09:49:02 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Mar 14 09:49:02 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/spark/SparkTask.java | 73 +++++++++++++++++--- .../spark/Statistic/SparkStatisticGroup.java | 18 +++-- .../exec/spark/Statistic/SparkStatistics.java | 21 ++++-- .../spark/Statistic/SparkStatisticsBuilder.java | 8 +-- .../spark/Statistic/SparkStatisticsNames.java | 43 ++++++++++++ .../spark/status/impl/JobMetricsListener.java | 24 +++---- .../spark/status/impl/LocalSparkJobStatus.java | 19 ++--- .../spark/status/impl/RemoteSparkJobStatus.java | 21 +++--- .../spark/status/impl/SparkMetricsUtils.java | 48 +++++-------- .../hadoop/hive/ql/history/HiveHistory.java | 1 + .../hive/spark/client/MetricsCollection.java | 4 +- .../apache/hive/spark/client/RemoteDriver.java | 2 +- .../hive/spark/client/metrics/Metrics.java | 14 ++-- .../spark/client/TestMetricsCollection.java | 13 ++-- 14 files changed, 212 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 76f6ecc..c240884 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -27,9 +27,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -77,7 +79,10 @@ public class SparkTask extends Task<SparkWork> { private static final LogHelper console = new LogHelper(LOG); private PerfLogger perfLogger; private static final long serialVersionUID = 1L; + // The id of the actual Spark job private transient int sparkJobID; + // The id of the JobHandle used to track the actual Spark job + private transient String sparkJobHandleId; private transient SparkStatistics sparkStatistics; private transient long submitTime; private transient long startTime; @@ -111,36 +116,60 @@ public class SparkTask extends Task<SparkWork> { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); + // Submit the Spark job perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + // If the driver context has been shutdown (due to query cancellation) kill the Spark job if (driverContext.isShutdown()) { LOG.warn("Killing Spark job"); killJob(); throw new HiveException("Operation is cancelled."); } - addToHistory(jobRef); - this.jobID = jobRef.getSparkJobStatus().getAppID(); + // Get the Job Handle id associated with the Spark job + sparkJobHandleId = jobRef.getJobId(); + + // Add Spark job handle id to the Hive History + addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId()); + + LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId); + + // Get the application id of the Spark app + jobID = jobRef.getSparkJobStatus().getAppID(); + + // Start monitoring the Spark job, returns when the Spark job has completed / failed, or if + // a timeout occurs rc = jobRef.monitorJob(); + + // Get the id the Spark job that was launched, returns -1 if no Spark job was launched + sparkJobID = jobRef.getSparkJobStatus().getJobId(); + + // Add Spark job id to the Hive History + addToHistory(Keys.SPARK_JOB_ID, Integer.toString(sparkJobID)); + + // Get the final state of the Spark job and parses its job info SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - sparkJobID = sparkJobStatus.getJobId(); getSparkJobInfo(sparkJobStatus, rc); + if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID)); logSparkStatistic(sparkStatistics); } - LOG.info("Successfully completed Spark Job " + sparkJobID + " with application ID " + + LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " + jobID + " and task ID " + getId()); } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. - LOG.info("Failed to submit Spark job " + sparkJobID); + LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId); + LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID) + ? "UNKNOWN" : jobID)); killJob(); } else if (rc == 4) { LOG.info("The spark job or one stage of it has too many tasks" + @@ -189,12 +218,35 @@ public class SparkTask extends Task<SparkWork> { return rc; } - private void addToHistory(SparkJobRef jobRef) { - console.printInfo("Starting Spark Job = " + jobRef.getJobId()); + /** + * Use the Spark metrics and calculate how much task executione time was spent performing GC + * operations. If more than a defined threshold of time is spent, print out a warning on the + * console. + */ + private void printExcessiveGCWarning() { + SparkStatisticGroup sparkStatisticGroup = sparkStatistics.getStatisticGroup( + SparkStatisticsNames.SPARK_GROUP_NAME); + if (sparkStatisticGroup != null) { + long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.TASK_DURATION_TIME).getValue()); + long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic( + SparkStatisticsNames.JVM_GC_TIME).getValue()); + + // Threshold percentage to trigger the GC warning + double threshold = 0.1; + + if (jvmGCTime > taskDurationTime * threshold) { + long percentGcTime = Math.round((double) jvmGCTime / taskDurationTime * 100); + String gcWarning = String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of " + + "task time in GC", sparkJobID, percentGcTime, jvmGCTime, taskDurationTime); + console.printInfo(gcWarning); + } + } + } + + private void addToHistory(Keys key, String value) { if (SessionState.get() != null) { - SessionState.get().getHiveHistory() - .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID, - Integer.toString(jobRef.getSparkJobStatus().getJobId())); + SessionState.get().getHiveHistory().setQueryProperty(queryState.getQueryId(), key, value); } } @@ -327,6 +379,7 @@ public class SparkTask extends Task<SparkWork> { } private void killJob() { + LOG.debug("Killing Spark job with job handle id " + sparkJobHandleId); boolean needToKillJob = false; if (jobRef != null && !jobKilled) { synchronized (this) { http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java index 5ab4d16..e1006e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java @@ -17,17 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.spark.Statistic; -import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatisticGroup { private final String groupName; - private final List<SparkStatistic> statisticList; + private final Map<String, SparkStatistic> statistics = new LinkedHashMap<>(); SparkStatisticGroup(String groupName, List<SparkStatistic> statisticList) { this.groupName = groupName; - this.statisticList = Collections.unmodifiableList(statisticList); + for (SparkStatistic sparkStatistic : statisticList) { + this.statistics.put(sparkStatistic.getName(), sparkStatistic); + } } public String getGroupName() { @@ -35,6 +38,13 @@ public class SparkStatisticGroup { } public Iterator<SparkStatistic> getStatistics() { - return this.statisticList.iterator(); + return this.statistics.values().iterator(); + } + + /** + * Get a {@link SparkStatistic} by its given name + */ + public SparkStatistic getSparkStatistic(String name) { + return this.statistics.get(name); } } http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java index 584e8bf..946cadc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hive.ql.exec.spark.Statistic; - -import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SparkStatistics { - private final List<SparkStatisticGroup> statisticGroups; - SparkStatistics(List<SparkStatisticGroup> statisticGroups) { - this.statisticGroups = Collections.unmodifiableList(statisticGroups); + private final Map<String, SparkStatisticGroup> statisticGroups = new LinkedHashMap<>(); + + SparkStatistics(List<SparkStatisticGroup> statisticGroupsList) { + for (SparkStatisticGroup group : statisticGroupsList) { + statisticGroups.put(group.getGroupName(), group); + } } public Iterator<SparkStatisticGroup> getStatisticGroups() { - return this.statisticGroups.iterator(); + return this.statisticGroups.values().iterator(); + } + + public SparkStatisticGroup getStatisticGroup(String groupName) { + return this.statisticGroups.get(groupName); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java index 6ebc274..d31d60a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java @@ -21,7 +21,7 @@ import org.apache.hive.spark.counter.SparkCounter; import org.apache.hive.spark.counter.SparkCounterGroup; import org.apache.hive.spark.counter.SparkCounters; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,15 +31,15 @@ public class SparkStatisticsBuilder { private Map<String, List<SparkStatistic>> statisticMap; public SparkStatisticsBuilder() { - statisticMap = new HashMap<String, List<SparkStatistic>>(); + statisticMap = new LinkedHashMap<>(); } public SparkStatistics build() { List<SparkStatisticGroup> statisticGroups = new LinkedList<SparkStatisticGroup>(); for (Map.Entry<String, List<SparkStatistic>> entry : statisticMap.entrySet()) { String groupName = entry.getKey(); - List<SparkStatistic> statisitcList = entry.getValue(); - statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList)); + List<SparkStatistic> statisticList = entry.getValue(); + statisticGroups.add(new SparkStatisticGroup(groupName, statisticList)); } return new SparkStatistics(statisticGroups); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java new file mode 100644 index 0000000..ca93a80 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +/** + * A collection of names that define different {@link SparkStatistic} objects. + */ +public class SparkStatisticsNames { + + public static final String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; + public static final String EXECUTOR_RUN_TIME = "ExecutorRunTime"; + public static final String RESULT_SIZE = "ResultSize"; + public static final String JVM_GC_TIME = "JvmGCTime"; + public static final String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; + public static final String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; + public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled"; + public static final String BYTES_READ = "BytesRead"; + public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; + public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; + public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; + public static final String FETCH_WAIT_TIME = "FetchWaitTime"; + public static final String REMOTE_BYTES_READ = "RemoteBytesRead"; + public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; + public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; + public static final String TASK_DURATION_TIME = "TaskDurationTime"; + + public static final String SPARK_GROUP_NAME = "SPARK"; +} http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index eaeb4dc..773fe97 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.util.AbstractMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.TaskInfo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -38,7 +40,8 @@ public class JobMetricsListener extends SparkListener { private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap(); private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap(); - private final Map<Integer, Map<Integer, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap(); + private final Map<Integer, Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>>> allJobMetrics = + Maps.newHashMap(); @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { @@ -47,17 +50,12 @@ public class JobMetricsListener extends SparkListener { if (jobId == null) { LOG.warn("Can not find job id for stage[" + stageId + "]."); } else { - Map<Integer, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); - if (jobMetrics == null) { - jobMetrics = Maps.newHashMap(); - allJobMetrics.put(jobId, jobMetrics); - } - List<TaskMetrics> stageMetrics = jobMetrics.get(stageId); - if (stageMetrics == null) { - stageMetrics = Lists.newLinkedList(); - jobMetrics.put(stageId, stageMetrics); - } - stageMetrics.add(taskEnd.taskMetrics()); + Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetrics = allJobMetrics.computeIfAbsent( + jobId, k -> Maps.newHashMap()); + List<Map.Entry<TaskMetrics, TaskInfo>> stageMetrics = jobMetrics.computeIfAbsent(stageId, + k -> Lists.newLinkedList()); + + stageMetrics.add(new AbstractMap.SimpleEntry<>(taskEnd.taskMetrics(), taskEnd.taskInfo())); } } @@ -74,7 +72,7 @@ public class JobMetricsListener extends SparkListener { jobIdToStageId.put(jobId, intStageIds); } - public synchronized Map<Integer, List<TaskMetrics>> getJobMetric(int jobId) { + public synchronized Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 8b031e7..03f8a0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -17,28 +17,31 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; -import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; import org.apache.hive.spark.client.MetricsCollection; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.counter.SparkCounters; + import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.TaskInfo; public class LocalSparkJobStatus implements SparkJobStatus { @@ -129,8 +132,7 @@ public class LocalSparkJobStatus implements SparkJobStatus { // add Hive operator level statistics. sparkStatisticsBuilder.add(sparkCounters); // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map<Integer, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId); + Map<Integer, List<Map.Entry<TaskMetrics, TaskInfo>>> jobMetric = jobMetricsListener.getJobMetric(jobId); if (jobMetric == null) { return null; } @@ -138,16 +140,17 @@ public class LocalSparkJobStatus implements SparkJobStatus { MetricsCollection metricsCollection = new MetricsCollection(); Set<Integer> stageIds = jobMetric.keySet(); for (int stageId : stageIds) { - List<TaskMetrics> taskMetrics = jobMetric.get(stageId); - for (TaskMetrics taskMetric : taskMetrics) { - Metrics metrics = new Metrics(taskMetric); + List<Map.Entry<TaskMetrics, TaskInfo>> taskMetrics = jobMetric.get(stageId); + for (Map.Entry<TaskMetrics, TaskInfo> taskMetric : taskMetrics) { + Metrics metrics = new Metrics(taskMetric.getKey(), taskMetric.getValue()); metricsCollection.addMetrics(jobId, stageId, 0, metrics); } } Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics(metricsCollection .getAllMetrics()); for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(), + Long.toString(entry.getValue())); } return sparkStatisticsBuilder.build(); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ec7ca40..ff969e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -19,28 +19,30 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hive.spark.client.MetricsCollection; -import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.hive.spark.client.MetricsCollection; import org.apache.hive.spark.client.Job; import org.apache.hive.spark.client.JobContext; import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.counter.SparkCounters; + import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkJobInfo; import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; -import java.net.InetAddress; -import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -125,16 +127,19 @@ public class RemoteSparkJobStatus implements SparkJobStatus { if (metricsCollection == null || getCounter() == null) { return null; } + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); - // add Hive operator level statistics. + + // add Hive operator level statistics. - e.g. RECORDS_IN, RECORDS_OUT sparkStatisticsBuilder.add(getCounter()); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics"; + // add spark job metrics. - e.g. metrics collected by Spark itself (JvmGCTime, + // ExecutorRunTime, etc.) Map<String, Long> flatJobMetric = SparkMetricsUtils.collectMetrics( metricsCollection.getAllMetrics()); for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + sparkStatisticsBuilder.add(SparkStatisticsNames.SPARK_GROUP_NAME, entry.getKey(), + Long.toString(entry.getValue())); } return sparkStatisticsBuilder.build(); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index dd17168..f72407e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -20,54 +20,40 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; final class SparkMetricsUtils { - private final static String EXECUTOR_DESERIALIZE_TIME = "ExecutorDeserializeTime"; - private final static String EXECUTOR_RUN_TIME = "ExecutorRunTime"; - private final static String RESULT_SIZE = "ResultSize"; - private final static String JVM_GC_TIME = "JvmGCTime"; - private final static String RESULT_SERIALIZATION_TIME = "ResultSerializationTime"; - private final static String MEMORY_BYTES_SPLIED = "MemoryBytesSpilled"; - private final static String DISK_BYTES_SPILLED = "DiskBytesSpilled"; - private final static String BYTES_READ = "BytesRead"; - private final static String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched"; - private final static String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched"; - private final static String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched"; - private final static String FETCH_WAIT_TIME = "FetchWaitTime"; - private final static String REMOTE_BYTES_READ = "RemoteBytesRead"; - private final static String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten"; - private final static String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; - private SparkMetricsUtils(){} static Map<String, Long> collectMetrics(Metrics allMetrics) { Map<String, Long> results = new LinkedHashMap<String, Long>(); - results.put(EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); - results.put(EXECUTOR_RUN_TIME, allMetrics.executorRunTime); - results.put(RESULT_SIZE, allMetrics.resultSize); - results.put(JVM_GC_TIME, allMetrics.jvmGCTime); - results.put(RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); - results.put(MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); - results.put(DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, allMetrics.executorDeserializeTime); + results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, allMetrics.executorRunTime); + results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize); + results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime); + results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, allMetrics.resultSerializationTime); + results.put(SparkStatisticsNames.MEMORY_BYTES_SPLIED, allMetrics.memoryBytesSpilled); + results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, allMetrics.diskBytesSpilled); + results.put(SparkStatisticsNames.TASK_DURATION_TIME, allMetrics.taskDurationTime); if (allMetrics.inputMetrics != null) { - results.put(BYTES_READ, allMetrics.inputMetrics.bytesRead); + results.put(SparkStatisticsNames.BYTES_READ, allMetrics.inputMetrics.bytesRead); } if (allMetrics.shuffleReadMetrics != null) { ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics; long rbf = shuffleReadMetrics.remoteBlocksFetched; long lbf = shuffleReadMetrics.localBlocksFetched; - results.put(REMOTE_BLOCKS_FETCHED, rbf); - results.put(LOCAL_BLOCKS_FETCHED, lbf); - results.put(TOTAL_BLOCKS_FETCHED, rbf + lbf); - results.put(FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); - results.put(REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); + results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf); + results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf); + results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf); + results.put(SparkStatisticsNames.FETCH_WAIT_TIME, shuffleReadMetrics.fetchWaitTime); + results.put(SparkStatisticsNames.REMOTE_BYTES_READ, shuffleReadMetrics.remoteBytesRead); } if (allMetrics.shuffleWriteMetrics != null) { - results.put(SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); - results.put(SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); + results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, allMetrics.shuffleWriteMetrics.shuffleBytesWritten); + results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } return results; } http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java index a5bafbc..327628f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java @@ -69,6 +69,7 @@ public interface HiveHistory { TASK_NUM_MAPPERS, TASK_NUM_REDUCERS, ROWS_INSERTED, + SPARK_JOB_HANDLE_ID, SPARK_JOB_ID }; http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index 0f03a64..526aefd 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; @@ -148,6 +147,7 @@ public class MetricsCollection { long resultSerializationTime = 0L; long memoryBytesSpilled = 0L; long diskBytesSpilled = 0L; + long taskDurationTime = 0L; // Input metrics. boolean hasInputMetrics = false; @@ -173,6 +173,7 @@ public class MetricsCollection { resultSerializationTime += m.resultSerializationTime; memoryBytesSpilled += m.memoryBytesSpilled; diskBytesSpilled += m.diskBytesSpilled; + taskDurationTime += m.taskDurationTime; if (m.inputMetrics != null) { hasInputMetrics = true; @@ -222,6 +223,7 @@ public class MetricsCollection { resultSerializationTime, memoryBytesSpilled, diskBytesSpilled, + taskDurationTime, inputMetrics, shuffleReadMetrics, shuffleWriteMetrics); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e584cbb..f221d0a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -478,7 +478,7 @@ public class RemoteDriver { public void onTaskEnd(SparkListenerTaskEnd taskEnd) { if (taskEnd.reason() instanceof org.apache.spark.Success$ && !taskEnd.taskInfo().speculative()) { - Metrics metrics = new Metrics(taskEnd.taskMetrics()); + Metrics metrics = new Metrics(taskEnd.taskMetrics(), taskEnd.taskInfo()); Integer jobId; synchronized (stageToJobId) { jobId = stageToJobId.get(taskEnd.stageId()); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index 418d534..9da0116 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -19,10 +19,11 @@ package org.apache.hive.spark.client.metrics; import java.io.Serializable; -import org.apache.spark.executor.TaskMetrics; - import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.TaskInfo; + /** * Metrics tracked during the execution of a job. * @@ -46,6 +47,8 @@ public class Metrics implements Serializable { public final long memoryBytesSpilled; /** The number of on-disk bytes spilled by tasks. */ public final long diskBytesSpilled; + /** Amount of time spent executing tasks. */ + public final long taskDurationTime; /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */ public final InputMetrics inputMetrics; /** @@ -58,7 +61,7 @@ public class Metrics implements Serializable { private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); } public Metrics( @@ -69,6 +72,7 @@ public class Metrics implements Serializable { long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, + long taskDurationTime, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, ShuffleWriteMetrics shuffleWriteMetrics) { @@ -79,12 +83,13 @@ public class Metrics implements Serializable { this.resultSerializationTime = resultSerializationTime; this.memoryBytesSpilled = memoryBytesSpilled; this.diskBytesSpilled = diskBytesSpilled; + this.taskDurationTime = taskDurationTime; this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; } - public Metrics(TaskMetrics metrics) { + public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { this( metrics.executorDeserializeTime(), metrics.executorRunTime(), @@ -93,6 +98,7 @@ public class Metrics implements Serializable { metrics.resultSerializationTime(), metrics.memoryBytesSpilled(), metrics.diskBytesSpilled(), + taskInfo.duration(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), optionalShuffleWriteMetrics(metrics)); http://git-wip-us.apache.org/repos/asf/hive/blob/57a1ec21/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 8fef66b..87b460d 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -66,7 +66,7 @@ public class TestMetricsCollection { @Test public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); - Metrics metrics = new Metrics(value, value, value, value, value, value, value, + Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, null, null, null); MetricsCollection collection = new MetricsCollection(); @@ -94,10 +94,10 @@ public class TestMetricsCollection { MetricsCollection collection = new MetricsCollection(); long value = taskValue(1, 1, 1); - Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); - Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(value), null, null); + Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, + new InputMetrics(value), null, null); + Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, + new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -108,7 +108,7 @@ public class TestMetricsCollection { private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; - return new Metrics(value, value, value, value, value, value, value, + return new Metrics(value, value, value, value, value, value, value, value, new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); @@ -154,6 +154,7 @@ public class TestMetricsCollection { assertEquals(expected, metrics.resultSerializationTime); assertEquals(expected, metrics.memoryBytesSpilled); assertEquals(expected, metrics.diskBytesSpilled); + assertEquals(expected, metrics.taskDurationTime); assertEquals(expected, metrics.inputMetrics.bytesRead);