Repository: hive
Updated Branches:
  refs/heads/master d9801d9c6 -> 72684f10d


HIVE-18273 : add LLAP-level counters for WM (Sergey Shelukhin, reviewed by 
Harish Jaiprakash and Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72684f10
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72684f10
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72684f10

Branch: refs/heads/master
Commit: 72684f10da8f124dfa624b16f3ffebeb2155664d
Parents: d9801d9
Author: sergey <ser...@apache.org>
Authored: Tue Jan 16 13:07:51 2018 -0800
Committer: sergey <ser...@apache.org>
Committed: Tue Jan 16 13:48:01 2018 -0800

----------------------------------------------------------------------
 .../hive/llap/counters/LlapWmCounters.java      |  27 ++++
 .../hive/llap/counters/WmFragmentCounters.java  | 133 +++++++++++++++++++
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  15 ++-
 .../llap/daemon/impl/TaskExecutorService.java   |  13 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  37 +++++-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   3 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    |   2 +-
 .../ql/exec/tez/monitoring/LlapWmSummary.java   | 108 +++++++++++++++
 .../ql/exec/tez/monitoring/TezJobMonitor.java   |   5 +
 9 files changed, 328 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java
new file mode 100644
index 0000000..8575da4
--- /dev/null
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapWmCounters.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+
+/**
+ * LLAP WM related counters.
+ */
+public enum LlapWmCounters {
+  SPECULATIVE_QUEUED_NS,
+  SPECULATIVE_RUNNING_NS,
+  GUARANTEED_QUEUED_NS,
+  GUARANTEED_RUNNING_NS,
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
new file mode 100644
index 0000000..babeb37
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * Per query counters.
+ */
+public class WmFragmentCounters {
+  private static enum State { NONE, QUEUED, RUNNING, DONE };
+  private State currentState = State.NONE;
+  private LlapWmCounters currentCounter = null;
+  private long currentCounterStartTime = 0;
+  private final AtomicLongArray fixedCounters;
+  private final TezCounters tezCounters;
+
+  public WmFragmentCounters(final TezCounters tezCounters) {
+    this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length);
+    this.tezCounters = tezCounters;
+  }
+
+  public void changeStateQueued(boolean isGuaranteed) {
+    changeState(State.QUEUED, getQueuedCounter(isGuaranteed));
+  }
+
+  public void changeStateRunning(boolean isGuaranteed) {
+    changeState(State.RUNNING, getRunningCounter(isGuaranteed));
+  }
+
+  private static LlapWmCounters getQueuedCounter(boolean isGuaranteed) {
+    return isGuaranteed
+        ? LlapWmCounters.GUARANTEED_QUEUED_NS : 
LlapWmCounters.SPECULATIVE_QUEUED_NS;
+  }
+
+  private static LlapWmCounters getRunningCounter(boolean isGuaranteed) {
+    return isGuaranteed
+        ? LlapWmCounters.GUARANTEED_RUNNING_NS : 
LlapWmCounters.SPECULATIVE_RUNNING_NS;
+  }
+
+  public void changeStateDone() {
+    changeState(State.DONE, null);
+  }
+
+  public void changeGuaranteed(boolean isGuaranteed) {
+    long newTime = System.nanoTime();
+    long oldTime = -1;
+    LlapWmCounters oldCounter = null;
+    synchronized (this) {
+      LlapWmCounters counter = null;
+      switch (currentState) {
+      case DONE:
+      case NONE: return;
+      case QUEUED: counter = getQueuedCounter(isGuaranteed); break;
+      case RUNNING: counter = getRunningCounter(isGuaranteed); break;
+      default: throw new AssertionError(currentState);
+      }
+      if (counter == currentCounter) return;
+      if (currentCounter != null) {
+        oldCounter = currentCounter;
+        oldTime = currentCounterStartTime;
+      }
+      currentCounter = counter;
+      currentCounterStartTime = newTime;
+    }
+    if (oldCounter != null) {
+      incrCounter(oldCounter, newTime - oldTime);
+    }
+  }
+
+
+  private void changeState(State newState, LlapWmCounters counter) {
+    long newTime = System.nanoTime();
+    long oldTime = -1;
+    LlapWmCounters oldCounter = null;
+    synchronized (this) {
+      if (newState.ordinal() < currentState.ordinal()) return;
+      if (counter == currentCounter) return;
+      if (currentCounter != null) {
+        oldCounter = currentCounter;
+        oldTime = currentCounterStartTime;
+      }
+      currentCounter = counter;
+      currentState = newState;
+      currentCounterStartTime = newTime;
+    }
+    if (oldCounter != null) {
+      incrCounter(oldCounter, newTime - oldTime);
+    }
+  }
+
+  private void incrCounter(LlapWmCounters counter, long delta) {
+    fixedCounters.addAndGet(counter.ordinal(), delta);
+    if (tezCounters != null) {
+      
tezCounters.findCounter(LlapWmCounters.values()[counter.ordinal()]).increment(delta);
+    }
+  }
+
+  @Override
+  public String toString() {
+    // We rely on NDC information in the logs to map counters to attempt.
+    // If that is not available, appId should either be passed in, or 
extracted from NDC.
+    StringBuilder sb = new StringBuilder("[ ");
+    for (int i = 0; i < fixedCounters.length(); ++i) {
+      if (i != 0) {
+        sb.append(", ");
+      }
+      
sb.append(LlapWmCounters.values()[i].name()).append("=").append(fixedCounters.get(i));
+    }
+    sb.append(" ]");
+    return sb.toString();
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index f0a26eb..37de03b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -32,6 +32,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.LlapWmCounters;
+import org.apache.hadoop.hive.llap.counters.WmFragmentCounters;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -61,6 +64,7 @@ import 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -70,6 +74,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.log4j.MDC;
 import org.apache.log4j.NDC;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -212,17 +217,17 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     // This is the start of container-annotated logging.
     final String dagId = 
attemptId.getTaskID().getVertexID().getDAGId().toString();
     final String queryId = vertex.getHiveQueryId();
-    final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
+    final String fragmentId = 
LlapTezUtils.stripAttemptPrefix(fragmentIdString);
     MDC.put("dagId", dagId);
     MDC.put("queryId", queryId);
-    MDC.put("fragmentId", fragId);
+    MDC.put("fragmentId", fragmentId);
     // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC 
for threads created in
     // thread pool. For now, we will push both dagId and queryId into NDC and 
the custom thread
     // pool that we use for task execution and llap io 
(StatsRecordingThreadPool) will pop them
     // using reflection and update the MDC.
     NDC.push(dagId);
     NDC.push(queryId);
-    NDC.push(fragId);
+    NDC.push(fragmentId);
     Scheduler.SubmissionState submissionState;
     SubmitWorkResponseProto.Builder responseBuilder = 
SubmitWorkResponseProto.newBuilder();
     try {
@@ -263,11 +268,13 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       Configuration callableConf = new Configuration(getConfig());
       UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
       boolean isGuaranteed = request.hasIsGuaranteed() && 
request.getIsGuaranteed();
+      WmFragmentCounters wmCounters = new WmFragmentCounters(
+          FragmentCountersMap.getCountersForFragment(fragmentId));
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, callableConf,
           new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,
           this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
-          completionListener, socketFactory, isGuaranteed);
+          completionListener, socketFactory, isGuaranteed, wmCounters);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index c0356af..9f15014 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import 
org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase;
@@ -53,6 +54,7 @@ import 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
@@ -466,6 +468,7 @@ public class TaskExecutorService extends AbstractService
       if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
         knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
         taskWrapper.setIsInWaitQueue(true);
+        task.setWmCountersQueued();
         if (LOG.isDebugEnabled()) {
           LOG.debug("{} added to wait queue. Current wait queue size={}", 
task.getRequestId(),
               waitQueue.size());
@@ -612,6 +615,7 @@ public class TaskExecutorService extends AbstractService
             LOG.debug("Removing {} from waitQueue", fragmentId);
           }
           taskWrapper.setIsInWaitQueue(false);
+          taskWrapper.getTaskRunnerCallable().setWmCountersDone();
           if (waitQueue.remove(taskWrapper)) {
             if (metrics != null) {
               metrics.setExecutorNumQueuedRequests(waitQueue.size());
@@ -624,6 +628,7 @@ public class TaskExecutorService extends AbstractService
           }
           removeFromPreemptionQueue(taskWrapper);
         }
+        taskWrapper.getTaskRunnerCallable().setWmCountersDone();
         // TODO: this will probably send a message to AM. Is that needed here?
         taskWrapper.getTaskRunnerCallable().killTask();
       } else {
@@ -674,10 +679,12 @@ public class TaskExecutorService extends AbstractService
     if (LOG.isInfoEnabled()) {
       LOG.info("Attempting to execute {}", taskWrapper);
     }
-    ListenableFuture<TaskRunner2Result> future = executorService.submit(
-        taskWrapper.getTaskRunnerCallable());
+    TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable();
+    task.setWmCountersRunning();
+    ListenableFuture<TaskRunner2Result> future = executorService.submit(task);
     runningFragmentCount.incrementAndGet();
     taskWrapper.setIsInWaitQueue(false);
+
     FutureCallback<TaskRunner2Result> wrappedCallback = 
createInternalCompletionListener(
       taskWrapper);
     // Callback on a separate thread so that when a task completes, the thread 
in the main queue
@@ -923,6 +930,7 @@ public class TaskExecutorService extends AbstractService
       knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
+      taskWrapper.getTaskRunnerCallable().setWmCountersDone();
       updatePreemptionListAndNotify(result.getEndReason());
       taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
     }
@@ -937,6 +945,7 @@ public class TaskExecutorService extends AbstractService
       knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
+      taskWrapper.getTaskRunnerCallable().setWmCountersDone();
       updatePreemptionListAndNotify(null);
       taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
       LOG.error("Failed notification received: Stacktrace: " + 
ExceptionUtils.getStackTrace(t));

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 7971eda..8004f33 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -23,8 +23,11 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.counters.LlapWmCounters;
+import org.apache.hadoop.hive.llap.counters.WmFragmentCounters;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
@@ -70,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.SocketFactory;
+
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -125,6 +129,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   private UserGroupInformation fsTaskUgi;
   private final SocketFactory socketFactory;
   private boolean isGuaranteed;
+  private WmFragmentCounters wmCounters;
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo 
fragmentInfo,
@@ -134,7 +139,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
                             FragmentCompletionHandler fragmentCompleteHandler, 
HadoopShim tezHadoopShim,
                             TezTaskAttemptID attemptId, SignableVertexSpec 
vertex, TezEvent initialEvent,
                             UserGroupInformation fsTaskUgi, 
SchedulerFragmentCompletingListener completionListener,
-                            SocketFactory socketFactory, boolean isGuaranteed) 
{
+                            SocketFactory socketFactory, boolean isGuaranteed, 
WmFragmentCounters wmCounters) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -169,6 +174,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     this.completionListener = completionListener;
     this.socketFactory = socketFactory;
     this.isGuaranteed = isGuaranteed;
+    this.wmCounters = wmCounters;
   }
 
   public long getStartTime() {
@@ -266,12 +272,9 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
         synchronized (this) {
           if (shouldRunTask) {
             taskRunner = new TezTaskRunner2(conf, fsTaskUgi, 
fragmentInfo.getLocalDirs(),
-                taskSpec,
-                vertex.getQueryIdentifier().getAppAttemptNumber(),
+                taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(),
                 serviceConsumerMetadata, envMap, startedInputsMap, 
taskReporter, executor,
-                objectRegistry,
-                pid,
-                executionContext, memoryAvailable, false, tezHadoopShim);
+                objectRegistry, pid, executionContext, memoryAvailable, false, 
tezHadoopShim);
           }
         }
         if (taskRunner == null) {
@@ -559,6 +562,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     }
 
     protected void logFragmentEnd(boolean success) {
+      LOG.info("WM counters: {}", wmCounters);
       
HistoryLogger.logFragmentEnd(vertex.getQueryIdentifier().getApplicationIdString(),
           request.getContainerIdString(), executionContext.getHostName(), 
queryId,
           fragmentInfo.getQueryInfo().getDagIdentifier(), 
vertex.getVertexName(),
@@ -607,5 +611,26 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
 
   public void setIsGuaranteed(boolean isGuaranteed) {
     this.isGuaranteed = isGuaranteed;
+    if (wmCounters != null) {
+      wmCounters.changeGuaranteed(isGuaranteed);
+    }
+  }
+
+  public void setWmCountersDone() {
+    if (wmCounters != null) {
+      wmCounters.changeStateDone();
+    }
+  }
+
+  public void setWmCountersQueued() {
+    if (wmCounters != null) {
+      wmCounters.changeStateQueued(isGuaranteed);
+    }
+  }
+
+  public void setWmCountersRunning() {
+    if (wmCounters != null) {
+      wmCounters.changeStateRunning(isGuaranteed);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index eba84c3..1cdbb06 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -179,8 +179,7 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
     }
     // IO thread pool. Listening is used for unhandled errors for now (TODO: 
remove?)
     int numThreads = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
-    executor = new StatsRecordingThreadPool(numThreads, numThreads,
-        0L, TimeUnit.MILLISECONDS,
+    executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, 
TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>(),
         new 
ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
     FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 078420d..69e1d87 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -217,7 +217,7 @@ public class TaskExecutorTestHelpers {
           LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), 
mock(
           FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
           requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
-          SchedulerFragmentCompletingListener.class), 
mock(SocketFactory.class), isGuaranteed);
+          SchedulerFragmentCompletingListener.class), 
mock(SocketFactory.class), isGuaranteed, null);
       this.workTime = workTime;
       this.canFinish = canFinish;
       this.canFinishQueue = canFinishQueue;

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java
new file mode 100644
index 0000000..f8f75d0
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LlapWmSummary.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.llap.counters.LlapWmCounters;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static 
org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static 
org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class LlapWmSummary implements PrintSummary {
+
+  private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %20s %20s 
%20s %20s";
+  private static final String LLAP_SUMMARY_TITLE = "LLAP WM Summary";
+  private static final String LLAP_SUMMARY_HEADER = 
String.format(LLAP_SUMMARY_HEADER_FORMAT,
+      "VERTICES", "SPECULATIVE QUEUED", "SPECULATIVE RUNNING",
+      "GUARANTEED QUEUED", "GUARANTEED RUNNING");
+
+
+
+  private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+  private Map<String, Progress> progressMap;
+  private DAGClient dagClient;
+  private boolean first = false;
+
+  LlapWmSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+    this.progressMap = progressMap;
+    this.dagClient = dagClient;
+  }
+
+  @Override
+  public void print(SessionState.LogHelper console) {
+    console.printInfo("");
+    console.printInfo(LLAP_SUMMARY_TITLE);
+
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    String counterGroup = LlapWmCounters.class.getName();
+    for (String vertexName : keys) {
+      TezCounters vertexCounters = vertexCounter(statusOptions, vertexName);
+      if (vertexCounters != null) {
+        if (!first) {
+          console.printInfo(SEPARATOR);
+          console.printInfo(LLAP_SUMMARY_HEADER);
+          console.printInfo(SEPARATOR);
+          first = true;
+        }
+        console.printInfo(vertexSummary(vertexName, counterGroup, 
vertexCounters));
+      }
+    }
+    console.printInfo(SEPARATOR);
+    console.printInfo("");
+  }
+
+  private String vertexSummary(String vertexName, String counterGroup, 
TezCounters vertexCounters) {
+    final long sq = getCounterValueByGroupName(
+        vertexCounters, counterGroup, 
LlapWmCounters.SPECULATIVE_QUEUED_NS.name());
+    final long sr = getCounterValueByGroupName(
+        vertexCounters, counterGroup, 
LlapWmCounters.SPECULATIVE_RUNNING_NS.name());
+    final long gq = getCounterValueByGroupName(
+        vertexCounters, counterGroup, 
LlapWmCounters.GUARANTEED_QUEUED_NS.name());
+    final long gr = getCounterValueByGroupName(
+        vertexCounters, counterGroup, 
LlapWmCounters.GUARANTEED_RUNNING_NS.name());
+
+
+
+    return String.format(LLAP_SUMMARY_HEADER_FORMAT, vertexName,
+        secondsFormatter.format(sq / 1000_000_000.0) + "s",
+        secondsFormatter.format(sr / 1000_000_000.0) + "s",
+        secondsFormatter.format(gq / 1000_000_000.0) + "s",
+        secondsFormatter.format(gr / 1000_000_000.0) + "s");
+  }
+
+  private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String 
vertexName) {
+    try {
+      return dagClient.getVertexStatus(vertexName, 
statusOptions).getVertexCounters();
+    } catch (IOException | TezException e) {
+      // best attempt, shouldn't really kill DAG for this
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/72684f10/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 5ade1f3..55e7d7d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
 import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
@@ -351,6 +352,10 @@ public class TezJobMonitor {
         new LLAPioSummary(progressMap, dagClient).print(console);
         new FSCountersSummary(progressMap, dagClient).print(console);
       }
+      String wmQueue = HiveConf.getVar(hiveConf, 
ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
+      if (wmQueue != null && !wmQueue.isEmpty()) {
+        new LlapWmSummary(progressMap, dagClient).print(console);
+      }
 
       console.printInfo("");
     }

Reply via email to