Repository: hive Updated Branches: refs/heads/master d9801d9c6 -> 72684f10d
HIVE-18273 : add LLAP-level counters for WM (Sergey Shelukhin, reviewed by Harish Jaiprakash and Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72684f10 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72684f10 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72684f10 Branch: refs/heads/master Commit: 72684f10da8f124dfa624b16f3ffebeb2155664d Parents: d9801d9 Author: sergey <ser...@apache.org> Authored: Tue Jan 16 13:07:51 2018 -0800 Committer: sergey <ser...@apache.org> Committed: Tue Jan 16 13:48:01 2018 -0800 ---------------------------------------------------------------------- .../hive/llap/counters/LlapWmCounters.java | 27 ++++ .../hive/llap/counters/WmFragmentCounters.java | 133 +++++++++++++++++++ .../llap/daemon/impl/ContainerRunnerImpl.java | 15 ++- .../llap/daemon/impl/TaskExecutorService.java | 13 +- .../llap/daemon/impl/TaskRunnerCallable.java | 37 +++++- .../hive/llap/io/api/impl/LlapIoImpl.java | 3 +- .../daemon/impl/TaskExecutorTestHelpers.java | 2 +- .../ql/exec/tez/monitoring/LlapWmSummary.java | 108 +++++++++++++++ .../ql/exec/tez/monitoring/TezJobMonitor.java | 5 + 9 files changed, 328 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java new file mode 100644 index 0000000..8575da4 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + + +/** + * LLAP WM related counters. + */ +public enum LlapWmCounters { + SPECULATIVE_QUEUED_NS, + SPECULATIVE_RUNNING_NS, + GUARANTEED_QUEUED_NS, + GUARANTEED_RUNNING_NS, +} http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java new file mode 100644 index 0000000..babeb37 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + +import java.util.concurrent.atomic.AtomicLongArray; +import org.apache.tez.common.counters.TezCounters; + +/** + * Per query counters. + */ +public class WmFragmentCounters { + private static enum State { NONE, QUEUED, RUNNING, DONE }; + private State currentState = State.NONE; + private LlapWmCounters currentCounter = null; + private long currentCounterStartTime = 0; + private final AtomicLongArray fixedCounters; + private final TezCounters tezCounters; + + public WmFragmentCounters(final TezCounters tezCounters) { + this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length); + this.tezCounters = tezCounters; + } + + public void changeStateQueued(boolean isGuaranteed) { + changeState(State.QUEUED, getQueuedCounter(isGuaranteed)); + } + + public void changeStateRunning(boolean isGuaranteed) { + changeState(State.RUNNING, getRunningCounter(isGuaranteed)); + } + + private static LlapWmCounters getQueuedCounter(boolean isGuaranteed) { + return isGuaranteed + ? LlapWmCounters.GUARANTEED_QUEUED_NS : LlapWmCounters.SPECULATIVE_QUEUED_NS; + } + + private static LlapWmCounters getRunningCounter(boolean isGuaranteed) { + return isGuaranteed + ? LlapWmCounters.GUARANTEED_RUNNING_NS : LlapWmCounters.SPECULATIVE_RUNNING_NS; + } + + public void changeStateDone() { + changeState(State.DONE, null); + } + + public void changeGuaranteed(boolean isGuaranteed) { + long newTime = System.nanoTime(); + long oldTime = -1; + LlapWmCounters oldCounter = null; + synchronized (this) { + LlapWmCounters counter = null; + switch (currentState) { + case DONE: + case NONE: return; + case QUEUED: counter = getQueuedCounter(isGuaranteed); break; + case RUNNING: counter = getRunningCounter(isGuaranteed); break; + default: throw new AssertionError(currentState); + } + if (counter == currentCounter) return; + if (currentCounter != null) { + oldCounter = currentCounter; + oldTime = currentCounterStartTime; + } + currentCounter = counter; + currentCounterStartTime = newTime; + } + if (oldCounter != null) { + incrCounter(oldCounter, newTime - oldTime); + } + } + + + private void changeState(State newState, LlapWmCounters counter) { + long newTime = System.nanoTime(); + long oldTime = -1; + LlapWmCounters oldCounter = null; + synchronized (this) { + if (newState.ordinal() < currentState.ordinal()) return; + if (counter == currentCounter) return; + if (currentCounter != null) { + oldCounter = currentCounter; + oldTime = currentCounterStartTime; + } + currentCounter = counter; + currentState = newState; + currentCounterStartTime = newTime; + } + if (oldCounter != null) { + incrCounter(oldCounter, newTime - oldTime); + } + } + + private void incrCounter(LlapWmCounters counter, long delta) { + fixedCounters.addAndGet(counter.ordinal(), delta); + if (tezCounters != null) { + tezCounters.findCounter(LlapWmCounters.values()[counter.ordinal()]).increment(delta); + } + } + + @Override + public String toString() { + // We rely on NDC information in the logs to map counters to attempt. + // If that is not available, appId should either be passed in, or extracted from NDC. + StringBuilder sb = new StringBuilder("[ "); + for (int i = 0; i < fixedCounters.length(); ++i) { + if (i != 0) { + sb.append(", "); + } + sb.append(LlapWmCounters.values()[i].name()).append("=").append(fixedCounters.get(i)); + } + sb.append(" ]"); + return sb.toString(); + } + + public TezCounters getTezCounters() { + return tezCounters; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index f0a26eb..37de03b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -32,6 +32,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.NotTezEventHelper; +import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.counters.LlapWmCounters; +import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -61,6 +64,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -70,6 +74,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.MDC; import org.apache.log4j.NDC; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; @@ -212,17 +217,17 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu // This is the start of container-annotated logging. final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); final String queryId = vertex.getHiveQueryId(); - final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); + final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); MDC.put("dagId", dagId); MDC.put("queryId", queryId); - MDC.put("fragmentId", fragId); + MDC.put("fragmentId", fragmentId); // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them // using reflection and update the MDC. NDC.push(dagId); NDC.push(queryId); - NDC.push(fragId); + NDC.push(fragmentId); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { @@ -263,11 +268,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Configuration callableConf = new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); + WmFragmentCounters wmCounters = new WmFragmentCounters( + FragmentCountersMap.getCountersForFragment(fragmentId)); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi, - completionListener, socketFactory, isGuaranteed); + completionListener, socketFactory, isGuaranteed, wmCounters); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index c0356af..9f15014 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase; @@ -53,6 +54,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -466,6 +468,7 @@ public class TaskExecutorService extends AbstractService if (evictedTask == null || !evictedTask.equals(taskWrapper)) { knownTasks.put(taskWrapper.getRequestId(), taskWrapper); taskWrapper.setIsInWaitQueue(true); + task.setWmCountersQueued(); if (LOG.isDebugEnabled()) { LOG.debug("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size()); @@ -612,6 +615,7 @@ public class TaskExecutorService extends AbstractService LOG.debug("Removing {} from waitQueue", fragmentId); } taskWrapper.setIsInWaitQueue(false); + taskWrapper.getTaskRunnerCallable().setWmCountersDone(); if (waitQueue.remove(taskWrapper)) { if (metrics != null) { metrics.setExecutorNumQueuedRequests(waitQueue.size()); @@ -624,6 +628,7 @@ public class TaskExecutorService extends AbstractService } removeFromPreemptionQueue(taskWrapper); } + taskWrapper.getTaskRunnerCallable().setWmCountersDone(); // TODO: this will probably send a message to AM. Is that needed here? taskWrapper.getTaskRunnerCallable().killTask(); } else { @@ -674,10 +679,12 @@ public class TaskExecutorService extends AbstractService if (LOG.isInfoEnabled()) { LOG.info("Attempting to execute {}", taskWrapper); } - ListenableFuture<TaskRunner2Result> future = executorService.submit( - taskWrapper.getTaskRunnerCallable()); + TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable(); + task.setWmCountersRunning(); + ListenableFuture<TaskRunner2Result> future = executorService.submit(task); runningFragmentCount.incrementAndGet(); taskWrapper.setIsInWaitQueue(false); + FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener( taskWrapper); // Callback on a separate thread so that when a task completes, the thread in the main queue @@ -923,6 +930,7 @@ public class TaskExecutorService extends AbstractService knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); + taskWrapper.getTaskRunnerCallable().setWmCountersDone(); updatePreemptionListAndNotify(result.getEndReason()); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } @@ -937,6 +945,7 @@ public class TaskExecutorService extends AbstractService knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); + taskWrapper.getTaskRunnerCallable().setWmCountersDone(); updatePreemptionListAndNotify(null); taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 7971eda..8004f33 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -23,8 +23,11 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.llap.counters.LlapWmCounters; +import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; @@ -70,6 +73,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.SocketFactory; + import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -125,6 +129,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private UserGroupInformation fsTaskUgi; private final SocketFactory socketFactory; private boolean isGuaranteed; + private WmFragmentCounters wmCounters; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, @@ -134,7 +139,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, UserGroupInformation fsTaskUgi, SchedulerFragmentCompletingListener completionListener, - SocketFactory socketFactory, boolean isGuaranteed) { + SocketFactory socketFactory, boolean isGuaranteed, WmFragmentCounters wmCounters) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -169,6 +174,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.completionListener = completionListener; this.socketFactory = socketFactory; this.isGuaranteed = isGuaranteed; + this.wmCounters = wmCounters; } public long getStartTime() { @@ -266,12 +272,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { synchronized (this) { if (shouldRunTask) { taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(), - taskSpec, - vertex.getQueryIdentifier().getAppAttemptNumber(), + taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, - objectRegistry, - pid, - executionContext, memoryAvailable, false, tezHadoopShim); + objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim); } } if (taskRunner == null) { @@ -559,6 +562,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } protected void logFragmentEnd(boolean success) { + LOG.info("WM counters: {}", wmCounters); HistoryLogger.logFragmentEnd(vertex.getQueryIdentifier().getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), queryId, fragmentInfo.getQueryInfo().getDagIdentifier(), vertex.getVertexName(), @@ -607,5 +611,26 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { public void setIsGuaranteed(boolean isGuaranteed) { this.isGuaranteed = isGuaranteed; + if (wmCounters != null) { + wmCounters.changeGuaranteed(isGuaranteed); + } + } + + public void setWmCountersDone() { + if (wmCounters != null) { + wmCounters.changeStateDone(); + } + } + + public void setWmCountersQueued() { + if (wmCounters != null) { + wmCounters.changeStateQueued(isGuaranteed); + } + } + + public void setWmCountersRunning() { + if (wmCounters != null) { + wmCounters.changeStateRunning(isGuaranteed); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index eba84c3..1cdbb06 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -179,8 +179,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { } // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?) int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); - executor = new StatsRecordingThreadPool(numThreads, numThreads, - 0L, TimeUnit.MILLISECONDS, + executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 078420d..69e1d87 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -217,7 +217,7 @@ public class TaskExecutorTestHelpers { LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, requestProto.getWorkSpec().getVertex(), initialEvent, null, mock( - SchedulerFragmentCompletingListener.class), mock(SocketFactory.class), isGuaranteed); + SchedulerFragmentCompletingListener.class), mock(SocketFactory.class), isGuaranteed, null); this.workTime = workTime; this.canFinish = canFinish; this.canFinishQueue = canFinishQueue; http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java new file mode 100644 index 0000000..f8f75d0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.llap.counters.LlapWmCounters; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.*; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName; + +public class LlapWmSummary implements PrintSummary { + + private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %20s %20s %20s %20s"; + private static final String LLAP_SUMMARY_TITLE = "LLAP WM Summary"; + private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT, + "VERTICES", "SPECULATIVE QUEUED", "SPECULATIVE RUNNING", + "GUARANTEED QUEUED", "GUARANTEED RUNNING"); + + + + private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00"); + private Map<String, Progress> progressMap; + private DAGClient dagClient; + private boolean first = false; + + LlapWmSummary(Map<String, Progress> progressMap, DAGClient dagClient) { + this.progressMap = progressMap; + this.dagClient = dagClient; + } + + @Override + public void print(SessionState.LogHelper console) { + console.printInfo(""); + console.printInfo(LLAP_SUMMARY_TITLE); + + SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + String counterGroup = LlapWmCounters.class.getName(); + for (String vertexName : keys) { + TezCounters vertexCounters = vertexCounter(statusOptions, vertexName); + if (vertexCounters != null) { + if (!first) { + console.printInfo(SEPARATOR); + console.printInfo(LLAP_SUMMARY_HEADER); + console.printInfo(SEPARATOR); + first = true; + } + console.printInfo(vertexSummary(vertexName, counterGroup, vertexCounters)); + } + } + console.printInfo(SEPARATOR); + console.printInfo(""); + } + + private String vertexSummary(String vertexName, String counterGroup, TezCounters vertexCounters) { + final long sq = getCounterValueByGroupName( + vertexCounters, counterGroup, LlapWmCounters.SPECULATIVE_QUEUED_NS.name()); + final long sr = getCounterValueByGroupName( + vertexCounters, counterGroup, LlapWmCounters.SPECULATIVE_RUNNING_NS.name()); + final long gq = getCounterValueByGroupName( + vertexCounters, counterGroup, LlapWmCounters.GUARANTEED_QUEUED_NS.name()); + final long gr = getCounterValueByGroupName( + vertexCounters, counterGroup, LlapWmCounters.GUARANTEED_RUNNING_NS.name()); + + + + return String.format(LLAP_SUMMARY_HEADER_FORMAT, vertexName, + secondsFormatter.format(sq / 1000_000_000.0) + "s", + secondsFormatter.format(sr / 1000_000_000.0) + "s", + secondsFormatter.format(gq / 1000_000_000.0) + "s", + secondsFormatter.format(gr / 1000_000_000.0) + "s"); + } + + private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String vertexName) { + try { + return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters(); + } catch (IOException | TezException e) { + // best attempt, shouldn't really kill DAG for this + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 5ade1f3..55e7d7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; @@ -351,6 +352,10 @@ public class TezJobMonitor { new LLAPioSummary(progressMap, dagClient).print(console); new FSCountersSummary(progressMap, dagClient).print(console); } + String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); + if (wmQueue != null && !wmQueue.isEmpty()) { + new LlapWmSummary(progressMap, dagClient).print(console); + } console.printInfo(""); }