Repository: hive Updated Branches: refs/heads/master 21008897c -> 6c79fa75b
HIVE-18025: Push resource plan changes to tez/unmanaged sessions (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6c79fa75 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6c79fa75 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6c79fa75 Branch: refs/heads/master Commit: 6c79fa75bef225f7ed1ac5c858879323d2e36825 Parents: 2100889 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon Nov 20 18:30:33 2017 -0800 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Nov 20 18:30:33 2017 -0800 ---------------------------------------------------------------------- .../hive/jdbc/TestTriggersNoTezSessionPool.java | 16 +++--- .../jdbc/TestTriggersTezSessionPoolManager.java | 16 +++--- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 58 +++++++++++--------- .../hive/ql/exec/tez/TezSessionPoolManager.java | 50 +++++++++-------- .../hive/ql/exec/tez/TezSessionPoolSession.java | 22 ++++++-- .../hive/ql/exec/tez/WorkloadManager.java | 19 ++++--- .../ql/wm/MetastoreGlobalTriggersFetcher.java | 38 ------------- .../apache/hive/service/server/HiveServer2.java | 58 +++++++++----------- 8 files changed, 128 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java index bcce3dc..33ef2eb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -16,17 +16,16 @@ package org.apache.hive.jdbc; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.List; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; import org.junit.Test; @@ -56,8 +55,11 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest { @Override void setupTriggers(final List<Trigger> triggers) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + WMFullResourcePlan rp = new WMFullResourcePlan( + new WMResourcePlan("rp"), null); + for (Trigger trigger : triggers) { + rp.addToTriggers(wmTriggerFromTrigger(trigger)); + } + TezSessionPoolManager.getInstance().updateTriggers(rp); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index b377275..a00c2e0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -16,18 +16,17 @@ package org.apache.hive.jdbc; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; import org.junit.Test; @@ -245,8 +244,11 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest @Override protected void setupTriggers(final List<Trigger> triggers) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + WMFullResourcePlan rp = new WMFullResourcePlan( + new WMResourcePlan("rp"), null); + for (Trigger trigger : triggers) { + rp.addToTriggers(wmTriggerFromTrigger(trigger)); + } + TezSessionPoolManager.getInstance().updateTriggers(rp); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index e7af5e0..b3d7a03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; @@ -713,48 +714,51 @@ public class DDLTask extends Task<DDLWork> implements Serializable { resourcePlan.setDefaultPoolPath(desc.getDefaultPoolPath()); } + final WorkloadManager wm = WorkloadManager.getInstance(); + final TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); boolean isActivate = false, isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); - WorkloadManager wm = null; if (desc.getStatus() != null) { resourcePlan.setStatus(desc.getStatus()); isActivate = desc.getStatus() == WMResourcePlanStatus.ACTIVE; - if (isActivate) { - wm = WorkloadManager.getInstance(); - if (wm == null && !isInTest) { - throw new HiveException("Resource plan can only be activated when WM is enabled"); - } - } } WMFullResourcePlan appliedRp = db.alterResourcePlan( - desc.getRpName(), resourcePlan, desc.isEnableActivate()); - if (!isActivate || (wm == null && isInTest)) return 0; - assert wm != null; + desc.getRpName(), resourcePlan, desc.isEnableActivate()); + if (!isActivate || (wm == null && isInTest) || (pm == null && isInTest)) { + return 0; + } if (appliedRp == null) { throw new HiveException("Cannot get a resource plan to apply"); // TODO: shut down HS2? } final String name = (desc.getNewName() != null) ? desc.getNewName() : desc.getRpName(); LOG.info("Activating a new resource plan " + name + ": " + appliedRp); - // Note: as per our current constraints, the behavior of two parallel activates is - // undefined; although only one will succeed and the other will receive exception. - // We need proper (semi-)transactional modifications to support this without hacks. - ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp); - boolean isOk = false; - try { - // Note: we may add an async option in future. For now, let the task fail for the user. - future.get(); - isOk = true; - LOG.info("Successfully activated resource plan " + name); - return 0; - } catch (InterruptedException | ExecutionException e) { - throw new HiveException(e); - } finally { - if (!isOk) { - LOG.error("Failed to activate resource plan " + name); - // TODO: shut down HS2? + if (wm != null) { + // Note: as per our current constraints, the behavior of two parallel activates is + // undefined; although only one will succeed and the other will receive exception. + // We need proper (semi-)transactional modifications to support this without hacks. + ListenableFuture<Boolean> future = wm.updateResourcePlanAsync(appliedRp); + boolean isOk = false; + try { + // Note: we may add an async option in future. For now, let the task fail for the user. + future.get(); + isOk = true; + LOG.info("Successfully activated resource plan " + name); + return 0; + } catch (InterruptedException | ExecutionException e) { + throw new HiveException(e); + } finally { + if (!isOk) { + LOG.error("Failed to activate resource plan " + name); + // TODO: shut down HS2? + } } } + if (pm != null) { + pm.updateTriggers(appliedRp); + LOG.info("Updated tez session pool manager with active resource plan: {}", appliedRp.getPlan().getName()); + } + return 0; } private int dropResourcePlan(Hive db, DropResourcePlanDesc desc) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 4e48f15..8417ebb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -30,10 +30,11 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -83,13 +84,13 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List<TezSessionState> openSessions = new LinkedList<>(); - private MetastoreGlobalTriggersFetcher globalTriggersFetcher; + private final List<Trigger> triggers = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; /** Note: this is not thread-safe. */ - public static TezSessionPoolManager getInstance() throws Exception { + public static TezSessionPoolManager getInstance() { TezSessionPoolManager local = instance; if (local == null) { instance = local = new TezSessionPoolManager(); @@ -183,15 +184,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger } public void initTriggers(final HiveConf conf) throws HiveException { - if (globalTriggersFetcher == null) { - Hive db = Hive.get(conf); - globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - } - if (triggerValidatorRunnable == null) { final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); - sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); + sessionTriggerProvider = new SessionTriggerProvider(openSessions, triggers); triggerActionHandler = new KillTriggerActionHandler(); triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler); startTriggerValidator(triggerValidationIntervalMs); @@ -349,6 +345,10 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger expirationTracker.stop(); } + if (triggerValidatorRunnable != null) { + stopTriggerValidator(); + } + instance = null; } @@ -502,14 +502,26 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.add(session); + updateSessions(); } - updateSessionsTriggers(); } - private void updateSessionsTriggers() { - if (sessionTriggerProvider != null && globalTriggersFetcher != null) { + private void updateSessions() { + if (sessionTriggerProvider != null) { sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + } + } + + public void updateTriggers(final WMFullResourcePlan appliedRp) { + if (sessionTriggerProvider != null && appliedRp != null) { + List<WMTrigger> wmTriggers = appliedRp.getTriggers(); + List<Trigger> triggers = new ArrayList<>(); + if (appliedRp.isSetTriggers()) { + for (WMTrigger wmTrigger : wmTriggers) { + triggers.add(ExecutionTrigger.fromWMTrigger(wmTrigger)); + } + } + sessionTriggerProvider.setTriggers(Collections.unmodifiableList(triggers)); } } @@ -521,11 +533,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger } synchronized (openSessions) { openSessions.remove(session); + updateSessions(); } if (defaultSessionPool != null) { defaultSessionPool.notifyClosed(session); } - updateSessionsTriggers(); } @VisibleForTesting @@ -534,13 +546,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger } - @VisibleForTesting - public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlobalTriggersFetcher) { - this.globalTriggersFetcher = metastoreGlobalTriggersFetcher; - updateSessionsTriggers(); - } - - public List<String> getTriggerCounterNames() { + List<String> getTriggerCounterNames() { List<String> counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { List<Trigger> activeTriggers = sessionTriggerProvider.getTriggers(); http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 769b24a..b3ccd24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -73,14 +73,24 @@ class TezSessionPoolSession extends TezSessionState { } public static abstract class AbstractTriggerValidator { + private ScheduledExecutorService scheduledExecutorService = null; abstract Runnable getTriggerValidatorRunnable(); - public void startTriggerValidator(long triggerValidationIntervalMs) { - final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + void startTriggerValidator(long triggerValidationIntervalMs) { + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + } + } + + void stopTriggerValidator() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 3990f95..d304701 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -209,15 +209,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); - - final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); - TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); - triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, - triggerValidationIntervalMs); - startTriggerValidator(triggerValidationIntervalMs); // TODO: why is this not in start - - org.apache.hadoop.metrics2.util.MBeans.register("HiveServer2", "WorkloadManager", this); } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -236,6 +227,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida allocationManager.start(); wmThread.start(); initRpFuture.get(); // Wait for the initial resource plan to be applied. + + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, + triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); } public void stop() throws Exception { @@ -256,6 +254,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida workPool.shutdownNow(); timeoutPool.shutdownNow(); + if (triggerValidatorRunnable != null) { + stopTriggerValidator(); + } INSTANCE = null; } http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java deleted file mode 100644 index 87c007f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.Hive; - -/** - * Fetch global (non-llap) rules from metastore - */ -public class MetastoreGlobalTriggersFetcher { - private static final String GLOBAL_TRIGGER_NAME = "global"; - private Hive db; - - public MetastoreGlobalTriggersFetcher(final Hive db) { - this.db = db; - } - - public List<Trigger> fetch() { - // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available - return new ArrayList<>(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/6c79fa75/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 5a6d0c8..c3afa19 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,16 +18,6 @@ package org.apache.hive.service.server; -import org.apache.hadoop.hive.metastore.api.WMMapping; - -import org.apache.hadoop.hive.metastore.api.WMPool; - -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; - -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; - -import com.google.common.collect.Lists; - import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; @@ -66,11 +56,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.RawStore; -import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.metastore.cache.CachedStore; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; @@ -107,6 +96,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; /** * HiveServer2. @@ -195,29 +185,31 @@ public class HiveServer2 extends CompositeService { throw new RuntimeException("Failed to get metastore connection", e); } - // Initialize workload management. - String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); - if (wmQueue != null && !wmQueue.isEmpty()) { - WMFullResourcePlan resourcePlan; - try { - resourcePlan = sessionHive.getActiveResourcePlan(); - } catch (HiveException e) { - throw new RuntimeException(e); - } - if (resourcePlan == null) { - if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { - LOG.error("Cannot activate workload management - no active resource plan"); - } else { - LOG.info("Creating a default resource plan for test"); - resourcePlan = createTestResourcePlan(); - } + WMFullResourcePlan resourcePlan; + try { + resourcePlan = sessionHive.getActiveResourcePlan(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + + if (resourcePlan == null) { + if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { + LOG.error("Cannot activate workload management - no active resource plan"); + } else { + LOG.info("Creating a default resource plan for test"); + resourcePlan = createTestResourcePlan(); } - if (resourcePlan != null) { + } + + if (resourcePlan != null) { + // Initialize workload management. + String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); + if (wmQueue != null && !wmQueue.isEmpty()) { LOG.info("Initializing workload management"); wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); - } else { - wm = null; } + tezSessionPoolManager.updateTriggers(resourcePlan); + LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); } // Create views registry