HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21c6a540 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21c6a540 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21c6a540 Branch: refs/heads/master Commit: 21c6a5407cebc5a096cd9aa10157be05a3ea9627 Parents: 3004335 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Thu Mar 15 23:01:25 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Thu Mar 15 23:01:25 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 + itests/hive-minikdc/pom.xml | 20 ++ itests/hive-unit-hadoop2/pom.xml | 21 +- .../apache/hive/jdbc/TestActivePassiveHA.java | 268 +++++++++++++++ .../hive/jdbc/miniHS2/AbstractHiveService.java | 8 +- .../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 7 + .../hive/llap/registry/ServiceRegistry.java | 11 +- .../registry/impl/LlapFixedRegistryImpl.java | 4 +- .../llap/registry/impl/LlapRegistryService.java | 5 +- .../impl/LlapZookeeperRegistryImpl.java | 12 +- .../hive/llap/security/LlapTokenClient.java | 3 +- .../hadoop/hive/registry/RegistryUtilities.java | 52 +++ .../hadoop/hive/registry/ServiceInstance.java | 18 +- .../hive/registry/ServiceInstanceSet.java | 12 +- .../hive/registry/impl/ServiceInstanceBase.java | 57 +++- .../hive/registry/impl/TezAmInstance.java | 21 +- .../hive/registry/impl/TezAmRegistryImpl.java | 8 +- .../hive/registry/impl/ZkRegistryBase.java | 242 +++++++++----- .../hadoop/hive/llap/LlapBaseInputFormat.java | 5 +- .../daemon/services/impl/LlapWebServices.java | 5 +- .../tezplugins/LlapTaskSchedulerService.java | 10 + .../hive/ql/exec/tez/TezSessionPoolManager.java | 9 +- .../hive/ql/exec/tez/TezSessionPoolSession.java | 11 +- .../hive/ql/exec/tez/TezSessionState.java | 5 +- .../apache/hadoop/hive/ql/exec/tez/Utils.java | 3 +- .../physical/LlapClusterStateForCompile.java | 3 +- .../hive/ql/exec/tez/TestTezSessionPool.java | 13 +- service/pom.xml | 43 ++- .../server/HS2ActivePassiveHARegistry.java | 325 +++++++++++++++++++ .../HS2ActivePassiveHARegistryClient.java | 54 +++ .../apache/hive/service/server/HiveServer2.java | 313 ++++++++++++------ .../server/HiveServer2HAInstanceSet.java | 29 ++ .../service/server/HiveServer2Instance.java | 108 ++++++ .../service/servlet/HS2LeadershipStatus.java | 48 +++ .../apache/hive/service/servlet/HS2Peers.java | 75 +++++ 35 files changed, 1559 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8bbf1be..06efd02 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1916,6 +1916,10 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.MILLISECONDS), "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), + HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s", + new TimeValidator(TimeUnit.SECONDS), + "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" + + "with exponential backoff is when curator client deems connection is lost to zookeeper."), HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace", "The parent node under which all ZooKeeper nodes are created."), HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false, @@ -2465,6 +2469,13 @@ public class HiveConf extends Configuration { "If true, the HiveServer2 WebUI will be secured with PAM."), // Tez session settings + HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false, + "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." + + "This will also require hive.server2.support.dynamic.service.discovery to be enabled."), + HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace", + "hs2ActivePassiveHA", + "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" + + "instances with zookeeper"), HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + "workload management is enabled and used for these sessions."), http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-minikdc/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml index 337535a..1e40d9d 100644 --- a/itests/hive-minikdc/pom.xml +++ b/itests/hive-minikdc/pom.xml @@ -117,6 +117,26 @@ <scope>test</scope> <classifier>tests</classifier> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- test inter-project --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit-hadoop2/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml index fb31fd4..85a6145 100644 --- a/itests/hive-unit-hadoop2/pom.xml +++ b/itests/hive-unit-hadoop2/pom.xml @@ -53,7 +53,26 @@ <artifactId>hive-exec</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- dependencies are always listed in sorted order by groupId, artifectId --> <!-- test intra-project --> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java new file mode 100644 index 0000000..26acbd7 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.server.HS2ActivePassiveHARegistry; +import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; +import org.apache.hive.service.server.HiveServer2Instance; +import org.apache.hive.service.servlet.HS2Peers; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestActivePassiveHA { + private MiniHS2 miniHS2_1 = null; + private MiniHS2 miniHS2_2 = null; + private static TestingServer zkServer; + private Connection hs2Conn = null; + private HiveConf hiveConf1; + private HiveConf hiveConf2; + + @BeforeClass + public static void beforeTest() throws Exception { + MiniHS2.cleanupLocalDir(); + zkServer = new TestingServer(); + Class.forName(MiniHS2.getJdbcDriverName()); + } + + @AfterClass + public static void afterTest() throws Exception { + if (zkServer != null) { + zkServer.close(); + zkServer = null; + } + MiniHS2.cleanupLocalDir(); + } + + @Before + public void setUp() throws Exception { + hiveConf1 = new HiveConf(); + hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf1); + miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build(); + hiveConf2 = new HiveConf(); + hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf2); + miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + if ((miniHS2_1 != null) && miniHS2_1.isStarted()) { + miniHS2_1.stop(); + } + if ((miniHS2_2 != null) && miniHS2_2.isStarted()) { + miniHS2_2.stop(); + } + } + + private static void setHAConfigs(Configuration conf) { + conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString()); + final String zkRootNamespace = "hs2test"; + conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace); + conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1); + } + + @Test(timeout = 60000) + public void testActivePassive() throws Exception { + Map<String, String> confOverlay = new HashMap<>(); + hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_1.start(confOverlay); + while(!miniHS2_1.isStarted()) { + Thread.sleep(100); + } + + hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_2.start(confOverlay); + while(!miniHS2_2.isStarted()) { + Thread.sleep(100); + } + + assertEquals(true, miniHS2_1.isLeader()); + String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + assertEquals(false, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + String resp = sendGet(url); + ObjectMapper objectMapper = new ObjectMapper(); + HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(2, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port1) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + + Configuration conf = new Configuration(); + setHAConfigs(conf); + HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf); + List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + List<HiveServer2Instance> leaders = new ArrayList<>(); + List<HiveServer2Instance> standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + + miniHS2_1.stop(); + + while(!miniHS2_2.isStarted()) { + Thread.sleep(100); + } + assertEquals(true, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + while (client.getAll().size() != 1) { + Thread.sleep(100); + } + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(1, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(0, standby.size()); + + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + resp = sendGet(url); + objectMapper = new ObjectMapper(); + hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(1, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port2) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + + // start 1st server again + hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_1.start(confOverlay); + + while(!miniHS2_1.isStarted()) { + Thread.sleep(100); + } + assertEquals(false, miniHS2_1.isLeader()); + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + while (client.getAll().size() != 2) { + Thread.sleep(100); + } + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + resp = sendGet(url); + objectMapper = new ObjectMapper(); + hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(2, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port2) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + } + + private String sendGet(String url) throws Exception { + URL obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + return response.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java index 6cab8cd..d21b764 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java @@ -35,14 +35,16 @@ public abstract class AbstractHiveService { private String hostname; private int binaryPort; private int httpPort; + private int webPort; private boolean startedHiveService = false; private List<String> addedProperties = new ArrayList<String>(); - public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) { + public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) { this.hiveConf = hiveConf; this.hostname = hostname; this.binaryPort = binaryPort; this.httpPort = httpPort; + this.webPort = webPort; } /** @@ -136,6 +138,10 @@ public abstract class AbstractHiveService { return httpPort; } + public int getWebPort() { + return webPort; + } + public boolean isStarted() { return startedHiveService; } http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 8bbf8a4..997726c 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -217,6 +217,8 @@ public class MiniHS2 extends AbstractHiveService { (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils .findFreePort()), (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils + .findFreePort()), + (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils .findFreePort())); hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l); hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10, @@ -306,6 +308,7 @@ public class MiniHS2 extends AbstractHiveService { hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort()); + hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort()); Path scratchDir = new Path(baseFsDir, "scratch"); // Create root scratchdir with write all, so that user impersonation has no issues. @@ -404,6 +407,10 @@ public class MiniHS2 extends AbstractHiveService { } + public boolean isLeader() { + return hiveServer2.isLeader(); + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 5d7f813..6178b4b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,13 +14,16 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; + +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.yarn.api.records.ApplicationId; /** * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. */ -public interface ServiceRegistry { +public interface ServiceRegistry<T extends ServiceInstance> { /** * Start the service registry @@ -49,14 +52,14 @@ public interface ServiceRegistry { * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not * started yet. 0 means do not wait. */ - LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; + ServiceInstanceSet<T> getInstances(String component, long clusterReadyTimeoutMs) throws + IOException; /** * Adds state change listeners for service instances. * @param listener - state change listener */ - void registerStateChangeListener( - ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException; + void registerStateChangeListener(ServiceInstanceStateChangeListener<T> listener) throws IOException; /** * @return The application ID of the LLAP cluster. http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index c88198f..f99d86c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -267,8 +268,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override - public void registerStateChangeListener( - final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) { + public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException { // nothing to set LOG.warn("Callbacks for instance state changes are not supported in fixed registry."); } http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 80a6aba..3bda40b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -35,7 +36,7 @@ public class LlapRegistryService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); - private ServiceRegistry registry = null; + private ServiceRegistry<LlapServiceInstance> registry = null; private final boolean isDaemon; private boolean isDynamic = false; private String identity = "(pending)"; @@ -136,7 +137,7 @@ public class LlapRegistryService extends AbstractService { } public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { - return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); + return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } public void registerStateChangeListener( http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 8339230..f5d6202 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -53,7 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LlapZookeeperRegistryImpl - extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry { + extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> { private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); /** @@ -65,8 +65,6 @@ public class LlapZookeeperRegistryImpl private static final String IPC_LLAP = "llap"; private static final String IPC_OUTPUTFORMAT = "llapoutputformat"; private final static String NAMESPACE_PREFIX = "llap-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; @@ -79,7 +77,7 @@ public class LlapZookeeperRegistryImpl public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, - USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE), @@ -225,7 +223,7 @@ public class LlapZookeeperRegistryImpl @Override public String toString() { - return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort + + return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]"; } @@ -327,9 +325,9 @@ public class LlapZookeeperRegistryImpl if (data == null) continue; String nodeName = extractNodeName(childData); if (nodeName.startsWith(WORKER_PREFIX)) { - Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath()); + LlapServiceInstance instances = getInstanceByPath(childData.getPath()); if (instances != null) { - unsorted.addAll(instances); + unsorted.add(instances); } } else if (nodeName.startsWith(SLOT_PREFIX)) { slotByWorker.put(extractWorkerIdFromSlot(childData), http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java index 32d5caa..3208e21 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -55,7 +56,7 @@ public class LlapTokenClient { private final SocketFactory socketFactory; private final RetryPolicy retryPolicy; private final Configuration conf; - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet<LlapServiceInstance> activeInstances; private Collection<LlapServiceInstance> lastKnownInstances; private LlapManagementProtocolClientImpl client; private LlapServiceInstance clientInstance; http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java new file mode 100644 index 0000000..e069e43 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +public class RegistryUtilities { + private static final String LOCALHOST = "localhost"; + + /** + * Will return hostname stored in InetAddress. + * + * @return hostname + */ + public static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + /** + * Will return FQDN of the host after doing reverse DNS lookip. + * + * @return FQDN of host + */ + public static String getCanonicalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + public static String getUUID() { + return String.valueOf(UUID.randomUUID()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java index 908b3bb..4493e99 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -21,27 +21,27 @@ public interface ServiceInstance { * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought * back on the same host/port */ - public abstract String getWorkerIdentity(); + String getWorkerIdentity(); /** * Hostname of the service instance * - * @return + * @return service hostname */ - public abstract String getHost(); + String getHost(); /** * RPC Endpoint for service instance - * - * @return + * + * @return rpc port */ - public int getRpcPort(); + int getRpcPort(); /** * Config properties of the Service Instance (llap.daemon.*) - * - * @return + * + * @return properties */ - public abstract Map<String, String> getProperties(); + Map<String, String> getProperties(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java index 34fba5c..63178cc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -29,15 +29,15 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> { * The worker identity does not collide between restarts, so each restart will have a unique id, * while having the same host/ip pair. * - * @return + * @return instance list */ Collection<InstanceType> getAll(); /** * Get an instance by worker identity. * - * @param name - * @return + * @param name worker id + * @return instance */ InstanceType getInstance(String name); @@ -46,13 +46,13 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> { * * The list could include dead and alive instances. * - * @param host - * @return + * @param host hostname + * @return instance list */ Set<InstanceType> getByHost(String host); /** - * Get number of instances in the currently availabe. + * Get number of instances in the currently available. * * @return - number of instances */ http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java index db3d788..de8910c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java @@ -15,6 +15,8 @@ package org.apache.hadoop.hive.registry.impl; import java.io.IOException; import java.util.Map; +import java.util.Objects; + import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; @@ -25,26 +27,27 @@ import org.slf4j.LoggerFactory; public class ServiceInstanceBase implements ServiceInstance { private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class); + private String host; + private int rpcPort; + private String workerIdentity; + private Map<String, String> properties; - protected final ServiceRecord srv; - protected final String host; - protected final int rpcPort; + // empty c'tor to make jackson happy + public ServiceInstanceBase() { - public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { - this.srv = srv; + } + public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Working with ServiceRecord: {}", srv); } - final Endpoint rpc = srv.getInternalEndpoint(rpcName); - - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = - Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); + this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + this.properties = srv.attributes(); } @Override @@ -57,17 +60,19 @@ public class ServiceInstanceBase implements ServiceInstance { } ServiceInstanceBase other = (ServiceInstanceBase) o; - return this.getWorkerIdentity().equals(other.getWorkerIdentity()); + return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity()) + && Objects.equals(host, other.host) + && Objects.equals(rpcPort, other.rpcPort); } @Override public int hashCode() { - return getWorkerIdentity().hashCode(); + return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort); } @Override public String getWorkerIdentity() { - return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + return workerIdentity; } @Override @@ -82,12 +87,28 @@ public class ServiceInstanceBase implements ServiceInstance { @Override public Map<String, String> getProperties() { - return srv.attributes(); + return properties; + } + + public void setHost(final String host) { + this.host = host; + } + + public void setRpcPort(final int rpcPort) { + this.rpcPort = rpcPort; + } + + public void setWorkerIdentity(final String workerIdentity) { + this.workerIdentity = workerIdentity; + } + + public void setProperties(final Map<String, String> properties) { + this.properties = properties; } @Override public String toString() { return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" - + host + ":" + rpcPort + "]"; + + host + ":" + rpcPort + "]"; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index 0724cf5..d09cb24 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -13,29 +13,26 @@ */ package org.apache.hadoop.hive.registry.impl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; import org.apache.commons.codec.binary.Base64; - -import com.google.common.io.ByteStreams; - -import org.apache.tez.common.security.JobTokenIdentifier; - -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.ByteStreams; public class TezAmInstance extends ServiceInstanceBase { private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class); private final int pluginPort; private Token<JobTokenIdentifier> token; - public TezAmInstance(ServiceRecord srv) throws IOException { + TezAmInstance(ServiceRecord srv) throws IOException { super(srv, TezAmRegistryImpl.IPC_TEZCLIENT); final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN); if (plugin != null) { @@ -76,7 +73,7 @@ public class TezAmInstance extends ServiceInstanceBase { @Override public String toString() { - return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort + + return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() + ", pluginPort=" + pluginPort + ", token=" + token + "]"; } http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 417e571..ab02cf4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -37,21 +37,19 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> { static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", AM_PLUGIN_JOBID = "am.plugin.jobid"; private final static String NAMESPACE_PREFIX = "tez-am-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; - public static TezAmRegistryImpl create(Configuration conf, boolean b) { + public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); return StringUtils.isBlank(amRegistryName) ? null - : new TezAmRegistryImpl(amRegistryName, conf, true); + : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk); } private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) { - super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE), http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 17269dd..e7227a8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -13,13 +13,7 @@ */ package org.apache.hadoop.hive.registry.impl; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -34,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.registry.RegistryUtilities; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; @@ -65,6 +63,12 @@ import org.apache.zookeeper.data.Id; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This is currently used for implementation inheritance only; it doesn't provide a unified flow * into which one can just plug a few abstract method implementations, because providing one with @@ -77,16 +81,18 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); private final static String SASL_NAMESPACE = "sasl"; private final static String UNSECURE_NAMESPACE = "unsecure"; - - static final String UNIQUE_IDENTIFIER = "registry.unique.id"; - private static final UUID uniq = UUID.randomUUID(); + protected final static String USER_SCOPE_PATH_PREFIX = "user-"; + protected static final String WORKER_PREFIX = "worker-"; + protected static final String WORKER_GROUP = "workers"; + public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + protected static final UUID UNIQUE_ID = UUID.randomUUID(); + private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); protected final Configuration conf; protected final CuratorFramework zooKeeperClient; - // userPathPrefix is the path specific to the user for which ACLs should be restrictive. // workersPath is the directory path where all the worker znodes are located. protected final String workersPath; - private final String userPathPrefix, workerNodePrefix; + private final String workerNodePrefix; protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data @@ -99,7 +105,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private final String disableMessage; private final Lock instanceCacheLock = new ReentrantLock(); - private final Map<String, Set<InstanceType>> pathToInstanceCache; + // there can be only one instance per path + private final Map<String, InstanceType> pathToInstanceCache; + // there can be multiple instances per node private final Map<String, Set<InstanceType>> nodeToInstanceCache; // The registration znode. @@ -109,29 +117,22 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private PathChildrenCache instancesCache; // Created on demand. /** Local hostname. */ - protected static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + protected static final String hostname = RegistryUtilities.getCanonicalHostName(); /** * @param rootNs A single root namespace override. Not recommended. - * @param nsPrefix The namespace prefix to use with default namespaces. + * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure' + * to namespace prefix to get effective root namespace). * @param userScopePathPrefix The prefix to use for the user-specific part of the path. * @param workerPrefix The prefix to use for each worker znode. + * @param workerGroup group name to use for all workers * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. * @param zkPrincipal ZK security principal. * @param zkKeytab ZK security keytab. * @param aclsConfig A config setting to use to determine if ACLs should be verified. */ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, - String userScopePathPrefix, String workerPrefix, + String userScopePathPrefix, String workerPrefix, String workerGroup, String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { this.conf = new Configuration(conf); this.saslLoginContextName = zkSaslLoginContextName; @@ -145,29 +146,52 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { this.disableMessage = ""; } this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - String zkEnsemble = getQuorumServers(this.conf); this.encoder = new RegistryUtils.ServiceRecordMarshal(); - int sessionTimeout = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - int baseSleepTime = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); - int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 // worker-0000000 is the sequence number which will be retained until session timeout. If a // worker does not respond due to communication interruptions it will retain the same sequence // number when it returns back. If session timeout expires, the node will be deleted and new // addition of the same node (restart) will get next sequence number - this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf); - this.workerNodePrefix = workerPrefix; - this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; + final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); + this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; + this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); this.instancesCache = null; this.stateChangeListeners = new HashSet<>(); this.pathToInstanceCache = new ConcurrentHashMap<>(); this.nodeToInstanceCache = new ConcurrentHashMap<>(); + final String namespace = getRootNamespace(rootNs, nsPrefix); + ACLProvider aclProvider; + // get acl provider for most outer path that is non-null + if (userPathPrefix == null) { + if (instanceName == null) { + if (workerGroup == null) { + aclProvider = getACLProviderForZKPath(namespace); + } else { + aclProvider = getACLProviderForZKPath(workerGroup); + } + } else { + aclProvider = getACLProviderForZKPath(instanceName); + } + } else { + aclProvider = getACLProviderForZKPath(userScopePathPrefix); + } + this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); + this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); + } + + public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) { + final boolean isSecure = UserGroupInformation.isSecurityEnabled(); + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + private ACLProvider getACLProviderForZKPath(String zkPath) { final boolean isSecure = UserGroupInformation.isSecurityEnabled(); - ACLProvider zooKeeperAclProvider = new ACLProvider() { + return new ACLProvider() { @Override public List<ACL> getDefaultAcl() { // We always return something from getAclForPath so this should not happen. @@ -177,31 +201,40 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { @Override public List<ACL> getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(userPathPrefix)) { + if (!isSecure || path == null || !path.contains(zkPath)) { // No security or the path is below the user path - full access. return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); } return createSecureAcls(); } }; - if (rootNs == null) { - rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path. - } + } + + private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + String zkEnsemble = getQuorumServers(conf); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int connectionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs - this.zooKeeperClient = CuratorFrameworkFactory.builder() - .connectString(zkEnsemble) - .sessionTimeoutMs(sessionTimeout) - .aclProvider(zooKeeperAclProvider) - .namespace(rootNs) - .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) - .build(); + return CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .connectionTimeoutMs(connectionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(namespace) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); } private static List<ACL> createSecureAcls() { // Read all to the world - List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE); + List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); // Create/Delete/Write/Admin to creator nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); return nodeAcls; @@ -211,9 +244,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { * Get the ensemble server addresses from the configuration. The format is: host1:port, * host2:port.. * - * @param conf + * @param conf configuration **/ - private String getQuorumServers(Configuration conf) { + private static String getQuorumServers(Configuration conf) { String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); @@ -238,7 +271,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { protected final String registerServiceRecord(ServiceRecord srv) throws IOException { // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); // Create a znode under the rootNamespace parent for this instance of the server try { @@ -275,11 +308,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { CloseableUtils.closeQuietly(znode); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - return uniq.toString(); + return UNIQUE_ID.toString(); } + protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + try { + znode.setData(encoder.toBytes(srv)); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to update znode with new service record", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException) e : new IOException(e); + } + } - protected final void initializeWithoutRegisteringInternal() throws IOException { + final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server try { try { @@ -345,8 +395,8 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private void addToCache(String path, String host, InstanceType instance) { instanceCacheLock.lock(); try { - putInCache(path, pathToInstanceCache, instance); - putInCache(host, nodeToInstanceCache, instance); + putInInstanceCache(path, pathToInstanceCache, instance); + putInNodeCache(host, nodeToInstanceCache, instance); } finally { instanceCacheLock.unlock(); } @@ -368,14 +418,19 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); } - private void putInCache(String key, Map<String, Set<InstanceType>> cache, + private void putInInstanceCache(String key, Map<String, InstanceType> cache, InstanceType instance) { + cache.put(key, instance); + } + + private void putInNodeCache(String key, Map<String, Set<InstanceType>> cache, + InstanceType instance) { Set<InstanceType> instanceSet = cache.get(key); if (instanceSet == null) { - instanceSet = Sets.newHashSet(); - cache.put(key, instanceSet); + instanceSet = new HashSet<>(); + instanceSet.add(instance); } - instanceSet.add(instance); + cache.put(key, instanceSet); } protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { @@ -403,7 +458,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; - protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) { + protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) { if (childData == null) return null; byte[] data = childData.getData(); if (data == null) return null; @@ -415,8 +470,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) - throws Exception { + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); @@ -427,28 +481,32 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { if (!nodeName.startsWith(workerNodePrefix)) return; LOG.info("{} for zknode {}", event.getType(), childData.getPath()); InstanceType instance = extractServiceInstance(event, childData); - int ephSeqVersion = extractSeqNum(nodeName); - switch (event.getType()) { - case CHILD_ADDED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { - listener.onCreate(instance, ephSeqVersion); - } - break; - case CHILD_UPDATED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { - listener.onUpdate(instance, ephSeqVersion); - } - break; - case CHILD_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); - for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { - listener.onRemove(instance, ephSeqVersion); + if (instance != null) { + int ephSeqVersion = extractSeqNum(nodeName); + switch (event.getType()) { + case CHILD_ADDED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onCreate(instance, ephSeqVersion); + } + break; + case CHILD_UPDATED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onUpdate(instance, ephSeqVersion); + } + break; + case CHILD_REMOVED: + removeFromCache(childData.getPath(), instance.getHost()); + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onRemove(instance, ephSeqVersion); + } + break; + default: + // Ignore all the other events; logged above. } - break; - default: - // Ignore all the other events; logged above. + } else { + LOG.info("instance is null for event: {} childData: {}", event.getType(), childData); } } } @@ -464,7 +522,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { protected final Set<InstanceType> getByHostInternal(String host) { Set<InstanceType> byHost = nodeToInstanceCache.get(host); - byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost; + byHost = (byHost == null) ? Sets.newHashSet() : byHost; if (LOG.isDebugEnabled()) { LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } @@ -472,11 +530,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { } protected final Collection<InstanceType> getAllInternal() { - Set<InstanceType> instances = new HashSet<>(); - for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) { - instances.addAll(instanceSet); - } - return instances; + return new HashSet<>(pathToInstanceCache.values()); } private static String extractNodeName(ChildData childData) { @@ -564,13 +618,17 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { CloseableUtils.class.getName(); } + protected void unregisterInternal() { + CloseableUtils.closeQuietly(znode); + } + public void stop() { CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(instancesCache); CloseableUtils.closeQuietly(zooKeeperClient); } - protected final Set<InstanceType> getInstancesByPath(String path) { + protected final InstanceType getInstanceByPath(String path) { return pathToInstanceCache.get(path); } @@ -588,4 +646,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { throw e; } } + + // for debugging + private class ZkConnectionStateListener implements ConnectionStateListener { + @Override + public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { + LOG.info("Connection state change notification received. State: {}", connectionState); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 0120639..3aec46b 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; @@ -343,7 +344,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { InetAddress address = InetAddress.getByName(host); - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; // The name used in the service registry may not match the host name we're using. @@ -375,7 +376,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; LOG.info("Finding random live service instance"); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index 58bf8dc..b944fad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -230,7 +232,8 @@ public class LlapWebServices extends AbstractService { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { + ServiceInstanceSet<LlapServiceInstance> instanceSet = registry.getInstances(); + for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b8..6ddecca 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,16 @@ package org.apache.hadoop.hive.llap.tezplugins; +import com.google.common.io.ByteArrayDataOutput; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 46cfe56..a051f90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -98,13 +98,18 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger protected TezSessionPoolManager() { } - public void startPool() throws Exception { + public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception { if (defaultSessionPool != null) { defaultSessionPool.start(); } if (expirationTracker != null) { expirationTracker.start(); } + initTriggers(conf); + if (resourcePlan != null) { + updateTriggers(resourcePlan); + LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); + } } public void setupPool(HiveConf conf) throws Exception { @@ -157,8 +162,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - initTriggers(conf); - String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d1b3fec..d3748ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -18,25 +18,20 @@ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.util.concurrent.SettableFuture; -import org.apache.hadoop.hive.registry.impl.TezAmInstance; -import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Collection; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.security.auth.login.LoginException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; -import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b98fb58..046ea19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -738,7 +738,10 @@ public class TezSessionState { private Path createTezDir(String sessionId, String suffix) throws IOException { // tez needs its own scratch dir (per session) // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. - Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); + SessionState sessionState = SessionState.get(); + String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState + .getHdfsScratchDirURIString(); + Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); FileSystem fs = tezDir.getFileSystem(conf); FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index bc438bb..1b7321b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -52,7 +53,7 @@ public class Utils { LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); Collection<LlapServiceInstance> serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); + serviceRegistry.getInstances().getAllInstancesOrdered(true); Preconditions.checkArgument(!serviceInstances.isEmpty(), "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); ArrayList<String> locations = new ArrayList<>(serviceInstances.size()); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index a8d729d..0d1990a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public class LlapClusterStateForCompile { return false; // Don't fail; this is best-effort. } } - LlapServiceInstanceSet instances; + ServiceInstanceSet<LlapServiceInstance> instances; try { instances = svc.getInstances(10); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d261623..d5b683f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Before; @@ -90,7 +93,7 @@ public class TestTezSessionPool { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); // this is now a LIFO operation // draw 1 and replace @@ -153,7 +156,7 @@ public class TestTezSessionPool { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); TezSessionState[] sessions = new TezSessionState[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { @@ -234,7 +237,7 @@ public class TestTezSessionPool { conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { LOG.error("Initialization error", e); fail(); @@ -295,7 +298,7 @@ public class TestTezSessionPool { try { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { e.printStackTrace(); fail(); http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/pom.xml ---------------------------------------------------------------------- diff --git a/service/pom.xml b/service/pom.xml index 9ad7555..e3774df 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -264,7 +264,48 @@ <version>${junit.version}</version> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>${tez.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-library</artifactId> + <version>${tez.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId>