HIVE-13364. Allow llap to work with dynamic ports for rpc, shuffle, ui. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/547c5cfc Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/547c5cfc Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/547c5cfc Branch: refs/heads/llap Commit: 547c5cfce9587de31a58622589a63eba62a4b120 Parents: 184e0e1 Author: Siddharth Seth <ss...@apache.org> Authored: Thu Mar 31 14:54:53 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu Mar 31 14:54:53 2016 -0700 ---------------------------------------------------------------------- .../impl/LlapZookeeperRegistryImpl.java | 9 +++--- .../hive/llap/daemon/impl/LlapDaemon.java | 34 ++++++++++++++++---- .../daemon/impl/LlapProtocolServerImpl.java | 7 ++-- .../daemon/services/impl/LlapWebServices.java | 13 ++++++-- .../hive/llap/daemon/MiniLlapCluster.java | 4 ++- 5 files changed, 50 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index c611d1a..ba38fb8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -68,8 +67,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; @@ -285,8 +282,10 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { // No node exists, throw exception throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper."); } - LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint, - znodePath); + LOG.info( + "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," + + " webui: {}, mgmt: {}, znodePath: {} ", + rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath); } catch (Exception e) { LOG.error("Unable to create a znode for this server instance", e); CloseableUtils.closeQuietly(znode); http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c8734a5..2fe59a2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -100,7 +100,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort) { + int mngPort, int shufflePort, int webPort) { super("LlapDaemon"); initializeLogging(); @@ -140,6 +140,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla "numExecutors=" + numExecutors + ", rpcListenerPort=" + srvPort + ", mngListenerPort=" + mngPort + + ", webPort=" + webPort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + ", executorMemory=" + executorMemoryBytes + @@ -206,12 +207,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla amReporter, executorClassLoader); addIfService(containerRunner); - this.registry = new LlapRegistryService(true); - addIfService(registry); + if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) { this.webServices = null; } else { - this.webServices = new LlapWebServices(); + this.webServices = new LlapWebServices(webPort); addIfService(webServices); } // Bring up the server only after all other components have started. @@ -219,6 +219,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla // AMReporter after the server so that it gets the correct address. It knows how to deal with // requests before it is started. addIfService(amReporter); + + // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. + this.registry = new LlapRegistryService(true); } private void initializeLogging() { @@ -289,11 +292,29 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla ShuffleHandler.initializeAndStart(shuffleHandlerConf); LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort()); this.shufflePort.set(ShuffleHandler.get().getPort()); + getConfig() + .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort()); super.serviceStart(); - LOG.info("LlapDaemon serviceStart complete"); + + // Setup the actual ports in the configuration. + getConfig().setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, server.getBindAddress().getPort()); + getConfig().setInt(ConfVars.LLAP_MANAGEMENT_RPC_PORT.varname, server.getManagementBindAddress().getPort()); + if (webServices != null) { + getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort()); + } + + this.registry.init(getConfig()); + this.registry.start(); + LOG.info( + "LlapDaemon serviceStart complete. RPC Port={}, ManagementPort={}, ShuflePort={}, WebPort={}", + server.getBindAddress().getPort(), server.getManagementBindAddress().getPort(), + ShuffleHandler.get().getPort(), (webServices == null ? "" : webServices.getPort())); } public void serviceStop() throws Exception { + if (registry != null) { + this.registry.stop(); + } super.serviceStop(); ShuffleHandler.shutdown(); shutdown(); @@ -341,6 +362,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); int shufflePort = daemonConf .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); long executorMemoryBytes = HiveConf.getIntVar( daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; @@ -348,7 +370,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, - isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort); + isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index 3a25a66..e99e689 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -19,7 +19,6 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import com.google.protobuf.BlockingService; @@ -189,11 +188,15 @@ public class LlapProtocolServerImpl extends AbstractService } @InterfaceAudience.Private - @VisibleForTesting InetSocketAddress getBindAddress() { return srvAddress.get(); } + @InterfaceAudience.Private + InetSocketAddress getManagementBindAddress() { + return mngAddress.get(); + } + private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index afb59c0..e4c622e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.daemon.services.impl; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hive.http.HttpServer; @@ -38,13 +38,14 @@ public class LlapWebServices extends AbstractService { private boolean useSSL = false; private boolean useSPNEGO = false; - public LlapWebServices() { + public LlapWebServices(int port) { super("LlapWebServices"); + this.port = port; } @Override public void serviceInit(Configuration conf) { - this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + this.useSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); this.useSPNEGO = HiveConf.getBoolVar(conf, ConfVars.LLAP_WEB_AUTO_AUTH); String bindAddress = "0.0.0.0"; @@ -69,6 +70,11 @@ public class LlapWebServices extends AbstractService { } } + @InterfaceAudience.Private + public int getPort() { + return this.http.getPort(); + } + @Override public void serviceStart() throws Exception { if (this.http != null) { @@ -76,6 +82,7 @@ public class LlapWebServices extends AbstractService { } } + @Override public void serviceStop() throws Exception { if (this.http != null) { this.http.stop(); http://git-wip-us.apache.org/repos/asf/hive/blob/547c5cfc/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index c920c24..a09c0b2 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -146,15 +146,17 @@ public class MiniLlapCluster extends AbstractService { int rpcPort = 0; int mngPort = 0; int shufflePort = 0; + int webPort = 0; boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); } llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); llapDaemon.init(conf); }