Repository: hive
Updated Branches:
  refs/heads/master 0d8323357 -> a3060b307


http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java
new file mode 100644
index 0000000..83cca89
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Post execution (success or failure) hook to print hive workload manager 
events summary.
+ */
+public class PostExecWMEventsSummaryPrinter implements ExecuteWithHookContext {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PostExecWMEventsSummaryPrinter.class.getName());
+
+  @Override
+  public void run(HookContext hookContext) throws Exception {
+    assert (hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK ||
+      hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK);
+    HiveConf conf = hookContext.getConf();
+    if (!"tez".equals(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+      return;
+    }
+
+    LOG.info("Executing post execution hook to print workload manager events 
summary..");
+    SessionState.LogHelper console = SessionState.getConsole();
+    QueryPlan plan = hookContext.getQueryPlan();
+    if (plan == null) {
+      return;
+    }
+
+    List<TezTask> rootTasks = Utilities.getTezTasks(plan.getRootTasks());
+    for (TezTask tezTask : rootTasks) {
+      WmContext wmContext = tezTask.getDriverContext().getCtx().getWmContext();
+      if (wmContext != null) {
+        wmContext.shortPrint(console);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
index e41b460..4adad7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
@@ -15,11 +15,14 @@
  */
 package org.apache.hadoop.hive.ql.wm;
 
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
 /**
  * Trigger interface which gets mapped to CREATE TRIGGER .. queries. A trigger 
can have a name, expression and action.
  * Trigger is a simple expression which gets evaluated during the lifecycle of 
query and executes an action
  * if the expression defined in trigger evaluates to true.
  */
+@JsonSerialize
 public interface Trigger {
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
index 8b142da..7995a8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
@@ -22,11 +22,11 @@ import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 /**
  * Interface for handling rule violations by queries and for performing 
actions defined in the rules.
  */
-public interface TriggerActionHandler {
+public interface TriggerActionHandler<SessionType> {
   /**
    * Applies the action defined in the rule for the specified queries
    *
    * @param queriesViolated - violated queries and the rule it violated
    */
-  void applyAction(Map<TezSessionState, Trigger> queriesViolated);
+  void applyAction(Map<SessionType, Trigger> queriesViolated);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
deleted file mode 100644
index 16072c3..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.wm;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Some context information that are required for rule evaluation.
- */
-public class TriggerContext {
-  private Set<String> desiredCounters = new HashSet<>();
-  private Map<String, Long> currentCounters = new HashMap<>();
-  private String queryId;
-  private long queryStartTime;
-  private boolean queryCompleted;
-
-  public TriggerContext(final long queryStartTime, final String queryId) {
-    this.queryStartTime = queryStartTime;
-    this.queryId = queryId;
-    this.queryCompleted = false;
-  }
-
-  public String getQueryId() {
-    return queryId;
-  }
-
-  public void setQueryId(final String queryId) {
-    this.queryId = queryId;
-  }
-
-  public Set<String> getDesiredCounters() {
-    return desiredCounters;
-  }
-
-  public void setDesiredCounters(final Set<String> desiredCounters) {
-    this.desiredCounters = desiredCounters;
-  }
-
-  public Map<String, Long> getCurrentCounters() {
-    return currentCounters;
-  }
-
-  public void setCurrentCounters(final Map<String, Long> currentCounters) {
-    this.currentCounters = currentCounters;
-  }
-
-  public long getElapsedTime() {
-    return System.currentTimeMillis() - queryStartTime;
-  }
-
-  public boolean isQueryCompleted() {
-    return queryCompleted;
-  }
-
-  public void setQueryCompleted(final boolean queryCompleted) {
-    this.queryCompleted = queryCompleted;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java
new file mode 100644
index 0000000..7a7ef50
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import static 
org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+
+import java.text.DecimalFormat;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.management.MXBean;
+
+import org.apache.hadoop.hive.ql.exec.tez.WmEvent;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.PrintSummary;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Some context information that are required for rule evaluation.
+ */
+@MXBean
+public class WmContext implements PrintSummary {
+  private static final Logger LOG = LoggerFactory.getLogger(WmContext.class);
+  @JsonProperty("queryId")
+  private String queryId;
+  @JsonProperty("queryStartTime")
+  private long queryStartTime;
+  @JsonProperty("queryEndTime")
+  private long queryEndTime;
+  @JsonProperty("queryCompleted")
+  private boolean queryCompleted;
+  @JsonProperty("queryWmEvents")
+  private final List<WmEvent> queryWmEvents = new LinkedList<>();
+  @JsonProperty("appliedTriggers")
+  private Set<Trigger> appliedTriggers = new HashSet<>();
+  @JsonProperty("subscribedCounters")
+  private Set<String> subscribedCounters = new HashSet<>();
+  @JsonProperty("currentCounters")
+  private Map<String, Long> currentCounters = new HashMap<>();
+  @JsonIgnore // explictly ignoring as Getter visibility is ANY for auto-json 
serialization of Trigger based on getters
+  private Future<Boolean> returnEventFuture;
+
+  public WmContext(final long queryStartTime, final String queryId) {
+    this.queryStartTime = queryStartTime;
+    this.queryId = queryId;
+    this.queryCompleted = false;
+  }
+
+  public Set<Trigger> getAppliedTriggers() {
+    return appliedTriggers;
+  }
+
+  public void addTriggers(final List<Trigger> triggers) {
+    if (triggers != null) {
+      this.appliedTriggers.addAll(triggers);
+      // reset and add counters. This can happen during start of query or a 
session being moved to another pool with its
+      // own set of triggers
+      Set<String> counters = new HashSet<>();
+      for (Trigger trigger : triggers) {
+        counters.add(trigger.getExpression().getCounterLimit().getName());
+      }
+      setSubscribedCounters(counters);
+      setCurrentCounters(new HashMap<>());
+    }
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public void setQueryId(final String queryId) {
+    this.queryId = queryId;
+  }
+
+  public Set<String> getSubscribedCounters() {
+    return subscribedCounters;
+  }
+
+  public void setSubscribedCounters(final Set<String> subscribedCounters) {
+    this.subscribedCounters = subscribedCounters;
+  }
+
+  public Map<String, Long> getCurrentCounters() {
+    return currentCounters;
+  }
+
+  public void setCurrentCounters(final Map<String, Long> currentCounters) {
+    this.currentCounters = currentCounters;
+  }
+
+  public long getElapsedTime() {
+    return System.currentTimeMillis() - queryStartTime;
+  }
+
+  public boolean isQueryCompleted() {
+    return queryCompleted;
+  }
+
+  public void setQueryCompleted(final boolean queryCompleted) {
+    this.queryCompleted = queryCompleted;
+    this.queryEndTime = System.currentTimeMillis();
+  }
+
+  public void addWMEvent(WmEvent wmEvent) {
+    queryWmEvents.add(wmEvent);
+  }
+
+  public long getQueryStartTime() {
+    return queryStartTime;
+  }
+
+  public long getQueryEndTime() {
+    return queryEndTime;
+  }
+
+  List<WmEvent> getQueryWmEvents() {
+    return queryWmEvents;
+  }
+
+  Future<Boolean> getReturnEventFuture() {
+    return returnEventFuture;
+  }
+
+  public void setReturnEventFuture(final Future<Boolean> returnEventFuture) {
+    this.returnEventFuture = returnEventFuture;
+  }
+
+  private static final String WM_EVENTS_HEADER_FORMAT = "%7s %24s %24s %11s 
%9s %13s";
+  private static final String WM_EVENTS_TITLE = "Workload Manager Events 
Summary";
+  private static final String WM_EVENTS_TABLE_HEADER = 
String.format(WM_EVENTS_HEADER_FORMAT,
+    "EVENT", "START_TIMESTAMP", "END_TIMESTAMP", "ELAPSED_MS", "CLUSTER %", 
"POOL");
+  private static final DecimalFormat DECIMAL_FORMAT = new 
DecimalFormat("#0.00");
+
+  @Override
+  public void print(final SessionState.LogHelper console) {
+    try {
+      waitForReturnSessionEvent();
+      boolean first = false;
+      console.printInfo("");
+      console.printInfo(WM_EVENTS_TITLE);
+
+      for (final WmEvent wmEvent : queryWmEvents) {
+        if (!first) {
+          console.printInfo("");
+          console.printInfo("QueryId: " + queryId);
+          console.printInfo("SessionId: " + 
queryWmEvents.get(0).getWmTezSessionInfo().getSessionId());
+          console.printInfo("Applied Triggers: " + getAppliedTriggers());
+          console.printInfo(SEPARATOR);
+          console.printInfo(WM_EVENTS_TABLE_HEADER);
+          console.printInfo(SEPARATOR);
+          first = true;
+        }
+        WmEvent.WmTezSessionInfo wmTezSessionInfo = 
wmEvent.getWmTezSessionInfo();
+        String row = String.format(WM_EVENTS_HEADER_FORMAT,
+          wmEvent.getEventType(),
+          Instant.ofEpochMilli(wmEvent.getEventStartTimestamp()).toString(),
+          Instant.ofEpochMilli(wmEvent.getEventEndTimestamp()).toString(),
+          wmEvent.getElapsedTime(),
+          DECIMAL_FORMAT.format(wmTezSessionInfo.getClusterPercent()),
+          wmTezSessionInfo.getPoolName());
+        console.printInfo(row);
+      }
+      console.printInfo(SEPARATOR);
+      console.printInfo("");
+    } catch (Exception e) {
+      LOG.warn("Unable to print WM events summary", e);
+    }
+  }
+
+  // TODO: expose all WMContext's via /jmx to use in UI
+  public void printJson(final SessionState.LogHelper console) {
+    try {
+      waitForReturnSessionEvent();
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, 
false);
+      // serialize json based on field annotations only
+      
objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
+        .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
+      String wmContextJson = 
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this);
+      console.printInfo("");
+      console.printInfo(WM_EVENTS_TITLE);
+      console.printInfo(SEPARATOR);
+      console.printInfo(wmContextJson);
+      console.printInfo(SEPARATOR);
+      console.printInfo("");
+    } catch (Exception e) {
+      LOG.warn("Unable to serialize WMContext to json.", e);
+    }
+  }
+
+  private void waitForReturnSessionEvent() throws ExecutionException, 
InterruptedException {
+    if (getReturnEventFuture() != null && 
!Thread.currentThread().isInterrupted()) {
+      getReturnEventFuture().get();
+    }
+  }
+
+  // prints short events information that are safe for consistent testing
+  public void shortPrint(final SessionState.LogHelper console) throws 
ExecutionException, InterruptedException {
+    waitForReturnSessionEvent();
+    console.printInfo(WmContext.WM_EVENTS_TITLE, false);
+    for (WmEvent wmEvent : getQueryWmEvents()) {
+      console.printInfo("Event: " + wmEvent.getEventType() +
+        " Pool: " + wmEvent.getWmTezSessionInfo().getPoolName() +
+        " Cluster %: " + 
WmContext.DECIMAL_FORMAT.format(wmEvent.getWmTezSessionInfo().getClusterPercent()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 78df962..c58e450 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
+import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -86,7 +87,7 @@ public class TestWorkloadManager {
         cdl.countDown();
       }
       try {
-       session.set((WmTezSession) wm.getSession(old, new 
MappingInput(userName, null), conf));
+       session.set((WmTezSession) wm.getSession(old, new 
MappingInput(userName, null), conf, null));
       } catch (Throwable e) {
         error.compareAndSet(null, e);
       }
@@ -185,10 +186,11 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public TezSessionState getSession(
-        TezSessionState session, MappingInput input, HiveConf conf) throws 
Exception {
+    public WmTezSession getSession(
+      TezSessionState session, MappingInput input, HiveConf conf,
+      final WmContext wmContext) throws Exception {
       // We want to wait for the iteration to finish and set the cluster 
fraction.
-      TezSessionState state = super.getSession(session, input, conf);
+      WmTezSession state = super.getSession(session, input, conf, null);
       ensureWm();
       return state;
     }
@@ -227,17 +229,17 @@ public class TestWorkloadManager {
     TezSessionState nonPool = mock(TezSessionState.class);
     when(nonPool.getConf()).thenReturn(conf);
     doNothing().when(nonPool).close(anyBoolean());
-    TezSessionState session = wm.getSession(nonPool, new MappingInput("user", 
null), conf);
+    TezSessionState session = wm.getSession(nonPool, new MappingInput("user", 
null), conf, null);
     verify(nonPool).close(anyBoolean());
     assertNotSame(nonPool, session);
     session.returnToSessionManager();
     TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
     when(diffPool.getConf()).thenReturn(conf);
     doNothing().when(diffPool).returnToSessionManager();
-    session = wm.getSession(diffPool, new MappingInput("user", null), conf);
+    session = wm.getSession(diffPool, new MappingInput("user", null), conf, 
null);
     verify(diffPool).returnToSessionManager();
     assertNotSame(diffPool, session);
-    TezSessionState session2 = wm.getSession(session, new MappingInput("user", 
null), conf);
+    TezSessionState session2 = wm.getSession(session, new MappingInput("user", 
null), conf, null);
     assertSame(session, session2);
   }
 
@@ -249,11 +251,11 @@ public class TestWorkloadManager {
     wm.start();
     // The queue should be ignored.
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
-    TezSessionState session = wm.getSession(null, new MappingInput("user", 
null), conf);
+    TezSessionState session = wm.getSession(null, new MappingInput("user", 
null), conf, null);
     assertEquals("test", session.getQueueName());
     assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
     session.setQueueName("test2");
-    session = wm.getSession(session, new MappingInput("user", null), conf);
+    session = wm.getSession(session, new MappingInput("user", null), conf, 
null);
     assertEquals("test", session.getQueueName());
   }
 
@@ -269,7 +271,7 @@ public class TestWorkloadManager {
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
     wm.start();
     WmTezSession session = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf);
+        null, new MappingInput("user", null), conf, null);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
     WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
@@ -287,10 +289,10 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
-    WmTezSession session = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf);
+    WmTezSession session = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf, null);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
-    WmTezSession session2 = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf);
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf, null);
     assertEquals(0.5, session.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -301,7 +303,7 @@ public class TestWorkloadManager {
     qam.assertWasCalledAndReset();
 
     // We never lose pool session, so we should still be able to get.
-    session = (WmTezSession) wm.getSession(null, new MappingInput("user", 
null), conf);
+    session = (WmTezSession) wm.getSession(null, new MappingInput("user", 
null), conf, null);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -322,20 +324,20 @@ public class TestWorkloadManager {
     assertEquals(5, wm.getNumSessions());
     // Get all the 5 sessions; validate cluster fractions.
     WmTezSession session05of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p1", null), conf);
+        null, new MappingInput("p1", null), conf, null);
     assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf);
+        null, new MappingInput("p2", null), conf, null);
     assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf);
+        null, new MappingInput("p2", null), conf, null);
     assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
     assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
     WmTezSession session02of06 = (WmTezSession) wm.getSession(
-        null,new MappingInput("r1", null), conf);
+        null,new MappingInput("r1", null), conf, null);
     assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
     WmTezSession session04 = (WmTezSession) wm.getSession(
-        null, new MappingInput("r2", null), conf);
+        null, new MappingInput("r2", null), conf, null);
     assertEquals(0.4, session04.getClusterFraction(), EPSILON);
     session05of06.returnToSessionManager();
     session03of06.returnToSessionManager();
@@ -367,7 +369,7 @@ public class TestWorkloadManager {
 
   private static void verifyMapping(
       WorkloadManager wm, HiveConf conf, MappingInput mi, String result) 
throws Exception {
-    WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf);
+    WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf, null);
     assertEquals(result, session.getPoolName());
     session.returnToSessionManager();
   }
@@ -381,9 +383,9 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null);
     final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
         sessionA4 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -397,7 +399,7 @@ public class TestWorkloadManager {
     assertNull(sessionA4.get());
     checkError(error);
     // While threads are blocked on A, we should still be able to get and 
return a B session.
-    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new 
MappingInput("B", null), conf);
+    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new 
MappingInput("B", null), conf, null);
     sessionB1.returnToSessionManager();
     sessionB2.returnToSessionManager();
     assertNull(sessionA3.get());
@@ -425,8 +427,8 @@ public class TestWorkloadManager {
     plan.getPlan().setDefaultPoolPath("A");
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession session1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf),
-        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf);
+    WmTezSession session1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
+        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -448,19 +450,19 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, 
qam);
     wm.start();
     WmTezSession session1 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf);
+        null, new MappingInput("user", null), conf, null);
     // First, try to reuse from the same pool - should "just work".
     WmTezSession session1a = (WmTezSession) wm.getSession(
-        session1, new MappingInput("user", null), conf);
+        session1, new MappingInput("user", null), conf, null);
     assertSame(session1, session1a);
     assertEquals(1.0, session1.getClusterFraction(), EPSILON);
     // Should still be able to get the 2nd session.
     WmTezSession session2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf);
+        null, new MappingInput("user", null), conf, null);
 
     // Now try to reuse with no other sessions remaining. Should still work.
     WmTezSession session2a = (WmTezSession) wm.getSession(
-        session2, new MappingInput("user", null), conf);
+        session2, new MappingInput("user", null), conf, null);
     assertSame(session2, session2a);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -517,19 +519,19 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("A", sessionA2.getPoolName());
     assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new 
MappingInput("B", null), conf);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new 
MappingInput("B", null), conf, null);
     assertSame(sessionA1, sessionB1);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed 
from A.
     // Make sure that we can still get a session from A.
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
     assertEquals("A", sessionA3.getPoolName());
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -549,7 +551,7 @@ public class TestWorkloadManager {
     wm.start();
  
     // One session will be running, the other will be queued in "A"
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf, null);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -574,7 +576,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
     // The new session will also go to B now.
     sessionA2.get().returnToSessionManager();
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf, null);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     sessionA1.returnToSessionManager();
@@ -598,11 +600,11 @@ public class TestWorkloadManager {
  
     // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 
running, 1 queued.
     // Total: 5/6 running.
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf),
-        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf),
-        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", 
null), conf),
-        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", 
null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null),
+        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null),
+        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", 
null), conf, null),
+        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", 
null), conf, null);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
         sessionD2 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -738,7 +740,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
 
     // [A: 1, B: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = 
wm.getAllSessionTriggerProviders();
@@ -762,7 +764,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
     // [A: 1, B: 1]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -789,7 +791,7 @@ public class TestWorkloadManager {
     assertEquals("B", sessionA2.getPoolName());
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
     // [A: 1, B: 2]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -829,7 +831,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
 
     // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = 
wm.getAllSessionTriggerProviders();
@@ -887,7 +889,7 @@ public class TestWorkloadManager {
     
assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
     assertEquals("B.x", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
     // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -986,7 +988,7 @@ public class TestWorkloadManager {
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
     try {
-      TezSessionState r = wm.getSession(null, new MappingInput("A", null), 
conf);
+      TezSessionState r = wm.getSession(null, new MappingInput("A", null), 
conf, null);
       fail("Expected an error but got " + r);
     } catch (Exception ex) {
       // Expected.
@@ -1037,7 +1039,7 @@ public class TestWorkloadManager {
     assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
     pool.returnSession(theOnlySession);
     // Make sure we can actually get a session still - parallelism/etc. should 
not be affected.
-    WmTezSession result = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf);
+    WmTezSession result = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
     assertEquals(sessionPoolName, result.getPoolName());
     assertEquals(1f, result.getClusterFraction(), EPSILON);
     result.returnToSessionManager();

Reply via email to