Repository: hive Updated Branches: refs/heads/master 0d8323357 -> a3060b307
http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java new file mode 100644 index 0000000..83cca89 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecWMEventsSummaryPrinter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.WmContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Post execution (success or failure) hook to print hive workload manager events summary. + */ +public class PostExecWMEventsSummaryPrinter implements ExecuteWithHookContext { + private static final Logger LOG = LoggerFactory.getLogger(PostExecWMEventsSummaryPrinter.class.getName()); + + @Override + public void run(HookContext hookContext) throws Exception { + assert (hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK || + hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK); + HiveConf conf = hookContext.getConf(); + if (!"tez".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return; + } + + LOG.info("Executing post execution hook to print workload manager events summary.."); + SessionState.LogHelper console = SessionState.getConsole(); + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + + List<TezTask> rootTasks = Utilities.getTezTasks(plan.getRootTasks()); + for (TezTask tezTask : rootTasks) { + WmContext wmContext = tezTask.getDriverContext().getCtx().getWmContext(); + if (wmContext != null) { + wmContext.shortPrint(console); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java index e41b460..4adad7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -15,11 +15,14 @@ */ package org.apache.hadoop.hive.ql.wm; +import org.codehaus.jackson.map.annotate.JsonSerialize; + /** * Trigger interface which gets mapped to CREATE TRIGGER .. queries. A trigger can have a name, expression and action. * Trigger is a simple expression which gets evaluated during the lifecycle of query and executes an action * if the expression defined in trigger evaluates to true. */ +@JsonSerialize public interface Trigger { /** http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java index 8b142da..7995a8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java @@ -22,11 +22,11 @@ import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; /** * Interface for handling rule violations by queries and for performing actions defined in the rules. */ -public interface TriggerActionHandler { +public interface TriggerActionHandler<SessionType> { /** * Applies the action defined in the rule for the specified queries * * @param queriesViolated - violated queries and the rule it violated */ - void applyAction(Map<TezSessionState, Trigger> queriesViolated); + void applyAction(Map<SessionType, Trigger> queriesViolated); } http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java deleted file mode 100644 index 16072c3..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Some context information that are required for rule evaluation. - */ -public class TriggerContext { - private Set<String> desiredCounters = new HashSet<>(); - private Map<String, Long> currentCounters = new HashMap<>(); - private String queryId; - private long queryStartTime; - private boolean queryCompleted; - - public TriggerContext(final long queryStartTime, final String queryId) { - this.queryStartTime = queryStartTime; - this.queryId = queryId; - this.queryCompleted = false; - } - - public String getQueryId() { - return queryId; - } - - public void setQueryId(final String queryId) { - this.queryId = queryId; - } - - public Set<String> getDesiredCounters() { - return desiredCounters; - } - - public void setDesiredCounters(final Set<String> desiredCounters) { - this.desiredCounters = desiredCounters; - } - - public Map<String, Long> getCurrentCounters() { - return currentCounters; - } - - public void setCurrentCounters(final Map<String, Long> currentCounters) { - this.currentCounters = currentCounters; - } - - public long getElapsedTime() { - return System.currentTimeMillis() - queryStartTime; - } - - public boolean isQueryCompleted() { - return queryCompleted; - } - - public void setQueryCompleted(final boolean queryCompleted) { - this.queryCompleted = queryCompleted; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java new file mode 100644 index 0000000..7a7ef50 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR; + +import java.text.DecimalFormat; +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.management.MXBean; + +import org.apache.hadoop.hive.ql.exec.tez.WmEvent; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.PrintSummary; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Some context information that are required for rule evaluation. + */ +@MXBean +public class WmContext implements PrintSummary { + private static final Logger LOG = LoggerFactory.getLogger(WmContext.class); + @JsonProperty("queryId") + private String queryId; + @JsonProperty("queryStartTime") + private long queryStartTime; + @JsonProperty("queryEndTime") + private long queryEndTime; + @JsonProperty("queryCompleted") + private boolean queryCompleted; + @JsonProperty("queryWmEvents") + private final List<WmEvent> queryWmEvents = new LinkedList<>(); + @JsonProperty("appliedTriggers") + private Set<Trigger> appliedTriggers = new HashSet<>(); + @JsonProperty("subscribedCounters") + private Set<String> subscribedCounters = new HashSet<>(); + @JsonProperty("currentCounters") + private Map<String, Long> currentCounters = new HashMap<>(); + @JsonIgnore // explictly ignoring as Getter visibility is ANY for auto-json serialization of Trigger based on getters + private Future<Boolean> returnEventFuture; + + public WmContext(final long queryStartTime, final String queryId) { + this.queryStartTime = queryStartTime; + this.queryId = queryId; + this.queryCompleted = false; + } + + public Set<Trigger> getAppliedTriggers() { + return appliedTriggers; + } + + public void addTriggers(final List<Trigger> triggers) { + if (triggers != null) { + this.appliedTriggers.addAll(triggers); + // reset and add counters. This can happen during start of query or a session being moved to another pool with its + // own set of triggers + Set<String> counters = new HashSet<>(); + for (Trigger trigger : triggers) { + counters.add(trigger.getExpression().getCounterLimit().getName()); + } + setSubscribedCounters(counters); + setCurrentCounters(new HashMap<>()); + } + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(final String queryId) { + this.queryId = queryId; + } + + public Set<String> getSubscribedCounters() { + return subscribedCounters; + } + + public void setSubscribedCounters(final Set<String> subscribedCounters) { + this.subscribedCounters = subscribedCounters; + } + + public Map<String, Long> getCurrentCounters() { + return currentCounters; + } + + public void setCurrentCounters(final Map<String, Long> currentCounters) { + this.currentCounters = currentCounters; + } + + public long getElapsedTime() { + return System.currentTimeMillis() - queryStartTime; + } + + public boolean isQueryCompleted() { + return queryCompleted; + } + + public void setQueryCompleted(final boolean queryCompleted) { + this.queryCompleted = queryCompleted; + this.queryEndTime = System.currentTimeMillis(); + } + + public void addWMEvent(WmEvent wmEvent) { + queryWmEvents.add(wmEvent); + } + + public long getQueryStartTime() { + return queryStartTime; + } + + public long getQueryEndTime() { + return queryEndTime; + } + + List<WmEvent> getQueryWmEvents() { + return queryWmEvents; + } + + Future<Boolean> getReturnEventFuture() { + return returnEventFuture; + } + + public void setReturnEventFuture(final Future<Boolean> returnEventFuture) { + this.returnEventFuture = returnEventFuture; + } + + private static final String WM_EVENTS_HEADER_FORMAT = "%7s %24s %24s %11s %9s %13s"; + private static final String WM_EVENTS_TITLE = "Workload Manager Events Summary"; + private static final String WM_EVENTS_TABLE_HEADER = String.format(WM_EVENTS_HEADER_FORMAT, + "EVENT", "START_TIMESTAMP", "END_TIMESTAMP", "ELAPSED_MS", "CLUSTER %", "POOL"); + private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#0.00"); + + @Override + public void print(final SessionState.LogHelper console) { + try { + waitForReturnSessionEvent(); + boolean first = false; + console.printInfo(""); + console.printInfo(WM_EVENTS_TITLE); + + for (final WmEvent wmEvent : queryWmEvents) { + if (!first) { + console.printInfo(""); + console.printInfo("QueryId: " + queryId); + console.printInfo("SessionId: " + queryWmEvents.get(0).getWmTezSessionInfo().getSessionId()); + console.printInfo("Applied Triggers: " + getAppliedTriggers()); + console.printInfo(SEPARATOR); + console.printInfo(WM_EVENTS_TABLE_HEADER); + console.printInfo(SEPARATOR); + first = true; + } + WmEvent.WmTezSessionInfo wmTezSessionInfo = wmEvent.getWmTezSessionInfo(); + String row = String.format(WM_EVENTS_HEADER_FORMAT, + wmEvent.getEventType(), + Instant.ofEpochMilli(wmEvent.getEventStartTimestamp()).toString(), + Instant.ofEpochMilli(wmEvent.getEventEndTimestamp()).toString(), + wmEvent.getElapsedTime(), + DECIMAL_FORMAT.format(wmTezSessionInfo.getClusterPercent()), + wmTezSessionInfo.getPoolName()); + console.printInfo(row); + } + console.printInfo(SEPARATOR); + console.printInfo(""); + } catch (Exception e) { + LOG.warn("Unable to print WM events summary", e); + } + } + + // TODO: expose all WMContext's via /jmx to use in UI + public void printJson(final SessionState.LogHelper console) { + try { + waitForReturnSessionEvent(); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); + String wmContextJson = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + console.printInfo(""); + console.printInfo(WM_EVENTS_TITLE); + console.printInfo(SEPARATOR); + console.printInfo(wmContextJson); + console.printInfo(SEPARATOR); + console.printInfo(""); + } catch (Exception e) { + LOG.warn("Unable to serialize WMContext to json.", e); + } + } + + private void waitForReturnSessionEvent() throws ExecutionException, InterruptedException { + if (getReturnEventFuture() != null && !Thread.currentThread().isInterrupted()) { + getReturnEventFuture().get(); + } + } + + // prints short events information that are safe for consistent testing + public void shortPrint(final SessionState.LogHelper console) throws ExecutionException, InterruptedException { + waitForReturnSessionEvent(); + console.printInfo(WmContext.WM_EVENTS_TITLE, false); + for (WmEvent wmEvent : getQueryWmEvents()) { + console.printInfo("Event: " + wmEvent.getEventType() + + " Pool: " + wmEvent.getWmTezSessionInfo().getPoolName() + + " Cluster %: " + WmContext.DECIMAL_FORMAT.format(wmEvent.getWmTezSessionInfo().getClusterPercent())); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a3060b30/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 78df962..c58e450 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; import org.slf4j.Logger; @@ -86,7 +87,7 @@ public class TestWorkloadManager { cdl.countDown(); } try { - session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf)); + session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null)); } catch (Throwable e) { error.compareAndSet(null, e); } @@ -185,10 +186,11 @@ public class TestWorkloadManager { } @Override - public TezSessionState getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + public WmTezSession getSession( + TezSessionState session, MappingInput input, HiveConf conf, + final WmContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. - TezSessionState state = super.getSession(session, input, conf); + WmTezSession state = super.getSession(session, input, conf, null); ensureWm(); return state; } @@ -227,17 +229,17 @@ public class TestWorkloadManager { TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, new MappingInput("user", null), conf); + session = wm.getSession(diffPool, new MappingInput("user", null), conf, null); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf); + TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null); assertSame(session, session2); } @@ -249,11 +251,11 @@ public class TestWorkloadManager { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf); + TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, new MappingInput("user", null), conf); + session = wm.getSession(session, new MappingInput("user", null), conf, null); assertEquals("test", session.getQueueName()); } @@ -269,7 +271,7 @@ public class TestWorkloadManager { WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); WmTezSession session = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); @@ -287,10 +289,10 @@ public class TestWorkloadManager { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -301,7 +303,7 @@ public class TestWorkloadManager { qam.assertWasCalledAndReset(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); + session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); @@ -322,20 +324,20 @@ public class TestWorkloadManager { assertEquals(5, wm.getNumSessions()); // Get all the 5 sessions; validate cluster fractions. WmTezSession session05of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p1", null), conf); + null, new MappingInput("p1", null), conf, null); assertEquals(0.3, session05of06.getClusterFraction(), EPSILON); WmTezSession session03of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.18, session03of06.getClusterFraction(), EPSILON); WmTezSession session03of06_2 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf); + null, new MappingInput("p2", null), conf, null); assertEquals(0.09, session03of06.getClusterFraction(), EPSILON); assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON); WmTezSession session02of06 = (WmTezSession) wm.getSession( - null,new MappingInput("r1", null), conf); + null,new MappingInput("r1", null), conf, null); assertEquals(0.12, session02of06.getClusterFraction(), EPSILON); WmTezSession session04 = (WmTezSession) wm.getSession( - null, new MappingInput("r2", null), conf); + null, new MappingInput("r2", null), conf, null); assertEquals(0.4, session04.getClusterFraction(), EPSILON); session05of06.returnToSessionManager(); session03of06.returnToSessionManager(); @@ -367,7 +369,7 @@ public class TestWorkloadManager { private static void verifyMapping( WorkloadManager wm, HiveConf conf, MappingInput mi, String result) throws Exception { - WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, mi, conf, null); assertEquals(result, session.getPoolName()); session.returnToSessionManager(); } @@ -381,9 +383,9 @@ public class TestWorkloadManager { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(), sessionA4 = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); @@ -397,7 +399,7 @@ public class TestWorkloadManager { assertNull(sessionA4.get()); checkError(error); // While threads are blocked on A, we should still be able to get and return a B session. - WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf); + WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); sessionB1.returnToSessionManager(); sessionB2.returnToSessionManager(); assertNull(sessionA3.get()); @@ -425,8 +427,8 @@ public class TestWorkloadManager { plan.getPlan().setDefaultPoolPath("A"); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -448,19 +450,19 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); WmTezSession session1 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // First, try to reuse from the same pool - should "just work". WmTezSession session1a = (WmTezSession) wm.getSession( - session1, new MappingInput("user", null), conf); + session1, new MappingInput("user", null), conf, null); assertSame(session1, session1a); assertEquals(1.0, session1.getClusterFraction(), EPSILON); // Should still be able to get the 2nd session. WmTezSession session2 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf); + null, new MappingInput("user", null), conf, null); // Now try to reuse with no other sessions remaining. Should still work. WmTezSession session2a = (WmTezSession) wm.getSession( - session2, new MappingInput("user", null), conf); + session2, new MappingInput("user", null), conf, null); assertSame(session2, session2a); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); @@ -517,19 +519,19 @@ public class TestWorkloadManager { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); assertEquals("A", sessionA2.getPoolName()); assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null); assertSame(sessionA1, sessionB1); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. // Make sure that we can still get a session from A. - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals("A", sessionA3.getPoolName()); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); @@ -549,7 +551,7 @@ public class TestWorkloadManager { wm.start(); // One session will be running, the other will be queued in "A" - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON); final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(); @@ -574,7 +576,7 @@ public class TestWorkloadManager { assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON); // The new session will also go to B now. sessionA2.get().returnToSessionManager(); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); sessionA1.returnToSessionManager(); @@ -598,11 +600,11 @@ public class TestWorkloadManager { // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued. // Total: 5/6 running. - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf), - sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf), - sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), + sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null), + sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null); final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(), sessionD2 = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); @@ -738,7 +740,7 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0] Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -762,7 +764,7 @@ public class TestWorkloadManager { assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 1] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -789,7 +791,7 @@ public class TestWorkloadManager { assertEquals("B", sessionA2.getPoolName()); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 2] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -829,7 +831,7 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -887,7 +889,7 @@ public class TestWorkloadManager { assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); assertEquals("B.x", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -986,7 +988,7 @@ public class TestWorkloadManager { failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); try { - TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf); + TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. @@ -1037,7 +1039,7 @@ public class TestWorkloadManager { assertEquals(0f, oldSession.getClusterFraction(), EPSILON); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. - WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); assertEquals(sessionPoolName, result.getPoolName()); assertEquals(1f, result.getClusterFraction(), EPSILON); result.returnToSessionManager();