Repository: hive Updated Branches: refs/heads/master 8f881f87b -> 215c5770c
HIVE-12256. Move LLAP registry into llap-client module. (Siddharth Seth, reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/215c5770 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/215c5770 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/215c5770 Branch: refs/heads/master Commit: 215c5770c7ef5e0d1428f9288354205eedcee399 Parents: 8f881f8 Author: Siddharth Seth <ss...@apache.org> Authored: Thu Oct 29 21:56:38 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu Oct 29 21:56:38 2015 -0700 ---------------------------------------------------------------------- llap-client/pom.xml | 6 + .../hive/llap/registry/ServiceInstance.java | 73 ++++ .../hive/llap/registry/ServiceInstanceSet.java | 57 +++ .../hive/llap/registry/ServiceRegistry.java | 59 +++ .../registry/impl/LlapFixedRegistryImpl.java | 223 +++++++++++ .../llap/registry/impl/LlapRegistryService.java | 87 +++++ .../registry/impl/LlapYarnRegistryImpl.java | 383 ++++++++++++++++++ llap-server/pom.xml | 6 - .../hive/llap/daemon/impl/LlapDaemon.java | 2 +- .../llap/daemon/registry/ServiceInstance.java | 73 ---- .../daemon/registry/ServiceInstanceSet.java | 57 --- .../llap/daemon/registry/ServiceRegistry.java | 59 --- .../registry/impl/LlapFixedRegistryImpl.java | 223 ----------- .../registry/impl/LlapRegistryService.java | 87 ----- .../registry/impl/LlapYarnRegistryImpl.java | 384 ------------------- .../dag/app/rm/LlapTaskSchedulerService.java | 6 +- .../app/rm/TestLlapTaskSchedulerService.java | 2 +- 17 files changed, 893 insertions(+), 894 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-client/pom.xml b/llap-client/pom.xml index 02243f8..0cd2ec9 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -72,6 +72,12 @@ <version>${hadoop.version}</version> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-registry</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> <!-- test inter-project --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java new file mode 100644 index 0000000..f116de4 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Resource; + +public interface ServiceInstance { + + /** + * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought + * back on the same host/port + */ + public String getWorkerIdentity(); + + /** + * Hostname of the service instance + * + * @return + */ + public String getHost(); + + /** + * RPC Endpoint for service instance + * + * @return + */ + public int getRpcPort(); + + /** + * Shuffle Endpoint for service instance + * + * @return + */ + public int getShufflePort(); + + /** + * Return the last known state (without refreshing) + * + * @return + */ + + public boolean isAlive(); + + /** + * Config properties of the Service Instance (llap.daemon.*) + * + * @return + */ + + public Map<String, String> getProperties(); + + /** + * Memory and Executors available for the LLAP tasks + * + * This does not include the size of the cache or the actual vCores allocated via Slider. + * + * @return + */ + public Resource getResource(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java new file mode 100644 index 0000000..388b5f3 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public interface ServiceInstanceSet { + + /** + * Get an instance mapping which map worker identity to each instance. + * + * The worker identity does not collide between restarts, so each restart will have a unique id, + * while having the same host/ip pair. + * + * @return + */ + public Map<String, ServiceInstance> getAll(); + + /** + * Get an instance by worker identity. + * + * @param name + * @return + */ + public ServiceInstance getInstance(String name); + + /** + * Get a list of service instances for a given host. + * + * The list could include dead and alive instances. + * + * @param host + * @return + */ + public Set<ServiceInstance> getByHost(String host); + + /** + * Refresh the instance set from registry backing store. + * + * @throws IOException + */ + public void refresh() throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java new file mode 100644 index 0000000..d3fb517 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + +import java.io.IOException; + +/** + * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. + */ +public interface ServiceRegistry { + + /** + * Start the service registry + * + * @throws InterruptedException + */ + public void start() throws InterruptedException; + + /** + * Stop the service registry + * + * @throws InterruptedException + */ + public void stop() throws InterruptedException; + + /** + * Register the current instance - the implementation takes care of the endpoints to register. + * + * @throws IOException + */ + public void register() throws IOException; + + /** + * Remove the current registration cleanly (implementation defined cleanup) + * + * @throws IOException + */ + public void unregister() throws IOException; + + /** + * Client API to get the list of instances registered via the current registry key. + * + * @param component + * @return + * @throws IOException + */ + public ServiceInstanceSet getInstances(String component) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java new file mode 100644 index 0000000..34e0682 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -0,0 +1,223 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapFixedRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(LlapFixedRegistryImpl.class); + + @InterfaceAudience.Private + // This is primarily for testing to avoid the host lookup + public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = "fixed.registry.resolve.host.names"; + + private final int port; + private final int shuffle; + private final String[] hosts; + private final int memory; + private final int vcores; + private final boolean resolveHosts; + + private final Map<String, String> srv = new HashMap<String, String>(); + + public LlapFixedRegistryImpl(String hosts, Configuration conf) { + this.hosts = hosts.split(","); + this.port = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + this.shuffle = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); + + for (Map.Entry<String, String> kv : conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.") + || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) { + // TODO: read this somewhere useful, like the task scheduler + srv.put(kv.getKey(), kv.getValue()); + } + } + + this.memory = + conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); + this.vcores = + conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + } + + @Override + public void start() throws InterruptedException { + // nothing to start + } + + @Override + public void stop() throws InterruptedException { + // nothing to stop + } + + @Override + public void register() throws IOException { + // nothing to register + } + + @Override + public void unregister() throws IOException { + // nothing to unregister + } + + public static String getWorkerIdentity(String host) { + // trigger clean errors for anyone who mixes up identity with hosts + return "host-" + host; + } + + private final class FixedServiceInstance implements ServiceInstance { + + private final String host; + + public FixedServiceInstance(String host) { + if (resolveHosts) { + try { + InetAddress inetAddress = InetAddress.getByName(host); + if (NetUtils.isLocalAddress(inetAddress)) { + InetSocketAddress socketAddress = new InetSocketAddress(0); + socketAddress = NetUtils.getConnectAddress(socketAddress); + LOG.info("Adding host identified as local: " + host + " as " + + socketAddress.getHostName()); + host = socketAddress.getHostName(); + } + } catch (UnknownHostException e) { + LOG.warn("Ignoring resolution issues for host: " + host, e); + } + } + this.host = host; + } + + @Override + public String getWorkerIdentity() { + return LlapFixedRegistryImpl.getWorkerIdentity(host); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + // TODO: allow >1 port per host? + return LlapFixedRegistryImpl.this.port; + } + + @Override + public int getShufflePort() { + return LlapFixedRegistryImpl.this.shuffle; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public Map<String, String> getProperties() { + Map<String, String> properties = new HashMap<>(srv); + // no worker identity + return properties; + } + + @Override + public Resource getResource() { + return Resource.newInstance(memory, vcores); + } + + @Override + public String toString() { + return "FixedServiceInstance{" + + "host=" + host + + ", memory=" + memory + + ", vcores=" + vcores + + '}'; + } + } + + private final class FixedServiceInstanceSet implements ServiceInstanceSet { + + private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>(); + + public FixedServiceInstanceSet() { + for (String host : hosts) { + // trigger bugs in anyone who uses this as a hostname + instances.put(getWorkerIdentity(host), new FixedServiceInstance(host)); + } + } + + @Override + public Map<String, ServiceInstance> getAll() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public Set<ServiceInstance> getByHost(String host) { + Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); + ServiceInstance inst = getInstance(getWorkerIdentity(host)); + if (inst != null) { + byHost.add(inst); + } + return byHost; + } + + @Override + public void refresh() throws IOException { + // I will do no such thing + } + + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + return new FixedServiceInstanceSet(); + } + + @Override + public String toString() { + return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java new file mode 100644 index 0000000..a8e1465 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapRegistryService extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); + + private ServiceRegistry registry = null; + private final boolean isDaemon; + + public LlapRegistryService(boolean isDaemon) { + super("LlapRegistryService"); + this.isDaemon = isDaemon; + } + + @Override + public void serviceInit(Configuration conf) { + String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts.startsWith("@")) { + registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon); + } else { + registry = new LlapFixedRegistryImpl(hosts, conf); + } + LOG.info("Using LLAP registry type " + registry); + } + + + @Override + public void serviceStart() throws Exception { + if (this.registry != null) { + this.registry.start(); + } + if (isDaemon) { + registerWorker(); + } + } + + @Override + public void serviceStop() throws Exception { + if (isDaemon) { + unregisterWorker(); + } + if (this.registry != null) { + this.registry.stop(); + } else { + LOG.warn("Stopping non-existent registry service"); + } + } + + private void registerWorker() throws IOException { + if (this.registry != null) { + this.registry.register(); + } + } + + private void unregisterWorker() throws IOException { + if (this.registry != null) { + this.registry.unregister(); + } + } + + public ServiceInstanceSet getInstances() throws IOException { + return this.registry.getInstances("LLAP"); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java new file mode 100644 index 0000000..d474b6f --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java @@ -0,0 +1,383 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class LlapYarnRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class); + + private final RegistryOperationsService client; + private final Configuration conf; + private final ServiceRecordMarshal encoder; + private final String path; + + private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); + + private static final UUID uniq = UUID.randomUUID(); + private static final String hostname; + + private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; + + private final static String SERVICE_CLASS = "org-apache-hive"; + + final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build()); + final long refreshDelay; + private final boolean isDaemon; + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) { + + LOG.info("Llap Registry is enabled with registryid: " + instanceName); + this.conf = new Configuration(conf); + conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // registry reference + client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); + encoder = new RegistryUtils.ServiceRecordMarshal(); + this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); + refreshDelay = + conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, + LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT); + this.isDaemon = isDaemon; + Preconditions.checkArgument(refreshDelay > 0, + "Refresh delay for registry has to be positive = %d", refreshDelay); + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + public Endpoint getServicesEndpoint() { + final int servicePort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT, + LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT); + final boolean isSSL = + conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL, + LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT); + final String scheme = isSSL ? "https" : "http"; + final URL serviceURL; + try { + serviceURL = new URL(scheme, hostname, servicePort, ""); + return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException("llap service URI for " + hostname + " is invalid", e); + } + } + + private final String getPath() { + return this.path; + } + + @Override + public void register() throws IOException { + String path = getPath(); + ServiceRecord srv = new ServiceRecord(); + srv.addInternalEndpoint(getRpcEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + srv.addExternalEndpoint(getServicesEndpoint()); + + for (Map.Entry<String, String> kv : this.conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + + client.mknode(RegistryPathUtils.parentOf(path), true); + + // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths + client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), + client.getClientAcls()); + } + + @Override + public void unregister() throws IOException { + // Nothing for the zkCreate models + } + + private class DynamicServiceInstance implements ServiceInstance { + + private final ServiceRecord srv; + private boolean alive = true; + private final String host; + private final int rpcPort; + private final int shufflePort; + + public DynamicServiceInstance(ServiceRecord srv) throws IOException { + this.srv = srv; + + final Endpoint shuffle = srv.getInternalEndpoint("shuffle"); + final Endpoint rpc = srv.getInternalEndpoint("llap"); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.shufflePort = + Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public String getWorkerIdentity() { + return srv.get(UNIQUE_IDENTIFIER); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public boolean isAlive() { + return alive ; + } + + public void kill() { + // May be possible to generate a notification back to the scheduler from here. + LOG.info("Killing service instance: " + this); + this.alive = false; + } + + @Override + public Map<String, String> getProperties() { + return srv.attributes(); + } + + @Override + public Resource getResource() { + int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); + return Resource.newInstance(memory, vCores); + } + + @Override + public String toString() { + return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]"; + } + + // Relying on the identity hashCode and equality, since refreshing instances retains the old copy + // of an already known instance. + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { + + // LinkedHashMap to retain iteration order. + private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); + + @Override + public synchronized Map<String, ServiceInstance> getAll() { + // Return a copy. Instances may be modified during a refresh. + return new LinkedHashMap<>(instances); + } + + @Override + public synchronized ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public void refresh() throws IOException { + /* call this from wherever */ + Map<String, ServiceInstance> freshInstances = new HashMap<String, ServiceInstance>(); + + String path = getPath(); + Map<String, ServiceRecord> records = + RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); + // Synchronize after reading the service records from the external service (ZK) + synchronized (this) { + Set<String> latestKeys = new HashSet<String>(); + LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); + for (ServiceRecord rec : records.values()) { + ServiceInstance instance = new DynamicServiceInstance(rec); + if (instance != null) { + if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { + // add a new object + freshInstances.put(instance.getWorkerIdentity(), instance); + if (LOG.isInfoEnabled()) { + LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " + + instance); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + + " which mapped to " + instance); + } + } + } + latestKeys.add(instance.getWorkerIdentity()); + } + + if (instances != null) { + // deep-copy before modifying + Set<String> oldKeys = new HashSet<>(instances.keySet()); + if (oldKeys.removeAll(latestKeys)) { + // This is all the records which have not checked in, and are effectively dead. + for (String k : oldKeys) { + // this is so that people can hold onto ServiceInstance references as placeholders for tasks + final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); + dead.kill(); + if (LOG.isInfoEnabled()) { + LOG.info("Deleting dead worker " + k + " which mapped to " + dead); + } + } + } + // oldKeys contains the set of dead instances at this point. + this.instances.keySet().removeAll(oldKeys); + this.instances.putAll(freshInstances); + } else { + this.instances.putAll(freshInstances); + } + } + } + + @Override + public synchronized Set<ServiceInstance> getByHost(String host) { + // TODO Maybe store this as a map which is populated during construction, to avoid walking + // the map on each request. + Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); + + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + i.getHost()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); + } + return byHost; + } + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component + if (this.client != null) { + instances.refresh(); + return instances; + } else { + Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); + return null; + } + } + + @Override + public void start() { + if (client == null) return; + client.start(); + if (isDaemon) return; + refresher.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + instances.refresh(); + } catch (IOException ioe) { + LOG.warn("Could not refresh hosts during scheduled refresh", ioe); + } + } + }, 0, refreshDelay, TimeUnit.SECONDS); + } + + @Override + public void stop() { + if (client != null) { + client.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/pom.xml ---------------------------------------------------------------------- diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 4be45a5..a733e2c 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -98,12 +98,6 @@ <optional>true</optional> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-registry</artifactId> - <version>${hadoop.version}</version> - <optional>true</optional> - </dependency> - <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-runtime-internals</artifactId> <version>${tez.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 6f75001..98b1ccd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; -import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java deleted file mode 100644 index f0f22aa..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry; - -import java.util.Map; - -import org.apache.hadoop.yarn.api.records.Resource; - -public interface ServiceInstance { - - /** - * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought - * back on the same host/port - */ - public String getWorkerIdentity(); - - /** - * Hostname of the service instance - * - * @return - */ - public String getHost(); - - /** - * RPC Endpoint for service instance - * - * @return - */ - public int getRpcPort(); - - /** - * Shuffle Endpoint for service instance - * - * @return - */ - public int getShufflePort(); - - /** - * Return the last known state (without refreshing) - * - * @return - */ - - public boolean isAlive(); - - /** - * Config properties of the Service Instance (llap.daemon.*) - * - * @return - */ - - public Map<String, String> getProperties(); - - /** - * Memory and Executors available for the LLAP tasks - * - * This does not include the size of the cache or the actual vCores allocated via Slider. - * - * @return - */ - public Resource getResource(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java deleted file mode 100644 index 7ab36d4..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -public interface ServiceInstanceSet { - - /** - * Get an instance mapping which map worker identity to each instance. - * - * The worker identity does not collide between restarts, so each restart will have a unique id, - * while having the same host/ip pair. - * - * @return - */ - public Map<String, ServiceInstance> getAll(); - - /** - * Get an instance by worker identity. - * - * @param name - * @return - */ - public ServiceInstance getInstance(String name); - - /** - * Get a list of service instances for a given host. - * - * The list could include dead and alive instances. - * - * @param host - * @return - */ - public Set<ServiceInstance> getByHost(String host); - - /** - * Refresh the instance set from registry backing store. - * - * @throws IOException - */ - public void refresh() throws IOException; - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java deleted file mode 100644 index a0f9aac..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry; - -import java.io.IOException; - -/** - * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. - */ -public interface ServiceRegistry { - - /** - * Start the service registry - * - * @throws InterruptedException - */ - public void start() throws InterruptedException; - - /** - * Stop the service registry - * - * @throws InterruptedException - */ - public void stop() throws InterruptedException; - - /** - * Register the current instance - the implementation takes care of the endpoints to register. - * - * @throws IOException - */ - public void register() throws IOException; - - /** - * Remove the current registration cleanly (implementation defined cleanup) - * - * @throws IOException - */ - public void unregister() throws IOException; - - /** - * Client API to get the list of instances registered via the current registry key. - * - * @param component - * @return - * @throws IOException - */ - public ServiceInstanceSet getInstances(String component) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java deleted file mode 100644 index 621a6a6..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry.impl; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.Resource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapFixedRegistryImpl implements ServiceRegistry { - - private static final Logger LOG = LoggerFactory.getLogger(LlapFixedRegistryImpl.class); - - @InterfaceAudience.Private - // This is primarily for testing to avoid the host lookup - public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = "fixed.registry.resolve.host.names"; - - private final int port; - private final int shuffle; - private final String[] hosts; - private final int memory; - private final int vcores; - private final boolean resolveHosts; - - private final Map<String, String> srv = new HashMap<String, String>(); - - public LlapFixedRegistryImpl(String hosts, Configuration conf) { - this.hosts = hosts.split(","); - this.port = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - this.shuffle = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); - this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); - - for (Map.Entry<String, String> kv : conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) - || kv.getKey().startsWith("hive.llap.") - || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) { - // TODO: read this somewhere useful, like the task scheduler - srv.put(kv.getKey(), kv.getValue()); - } - } - - this.memory = - conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); - this.vcores = - conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - } - - @Override - public void start() throws InterruptedException { - // nothing to start - } - - @Override - public void stop() throws InterruptedException { - // nothing to stop - } - - @Override - public void register() throws IOException { - // nothing to register - } - - @Override - public void unregister() throws IOException { - // nothing to unregister - } - - public static String getWorkerIdentity(String host) { - // trigger clean errors for anyone who mixes up identity with hosts - return "host-" + host; - } - - private final class FixedServiceInstance implements ServiceInstance { - - private final String host; - - public FixedServiceInstance(String host) { - if (resolveHosts) { - try { - InetAddress inetAddress = InetAddress.getByName(host); - if (NetUtils.isLocalAddress(inetAddress)) { - InetSocketAddress socketAddress = new InetSocketAddress(0); - socketAddress = NetUtils.getConnectAddress(socketAddress); - LOG.info("Adding host identified as local: " + host + " as " - + socketAddress.getHostName()); - host = socketAddress.getHostName(); - } - } catch (UnknownHostException e) { - LOG.warn("Ignoring resolution issues for host: " + host, e); - } - } - this.host = host; - } - - @Override - public String getWorkerIdentity() { - return LlapFixedRegistryImpl.getWorkerIdentity(host); - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getRpcPort() { - // TODO: allow >1 port per host? - return LlapFixedRegistryImpl.this.port; - } - - @Override - public int getShufflePort() { - return LlapFixedRegistryImpl.this.shuffle; - } - - @Override - public boolean isAlive() { - return true; - } - - @Override - public Map<String, String> getProperties() { - Map<String, String> properties = new HashMap<>(srv); - // no worker identity - return properties; - } - - @Override - public Resource getResource() { - return Resource.newInstance(memory, vcores); - } - - @Override - public String toString() { - return "FixedServiceInstance{" + - "host=" + host + - ", memory=" + memory + - ", vcores=" + vcores + - '}'; - } - } - - private final class FixedServiceInstanceSet implements ServiceInstanceSet { - - private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>(); - - public FixedServiceInstanceSet() { - for (String host : hosts) { - // trigger bugs in anyone who uses this as a hostname - instances.put(getWorkerIdentity(host), new FixedServiceInstance(host)); - } - } - - @Override - public Map<String, ServiceInstance> getAll() { - return instances; - } - - @Override - public ServiceInstance getInstance(String name) { - return instances.get(name); - } - - @Override - public Set<ServiceInstance> getByHost(String host) { - Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); - ServiceInstance inst = getInstance(getWorkerIdentity(host)); - if (inst != null) { - byHost.add(inst); - } - return byHost; - } - - @Override - public void refresh() throws IOException { - // I will do no such thing - } - - } - - @Override - public ServiceInstanceSet getInstances(String component) throws IOException { - return new FixedServiceInstanceSet(); - } - - @Override - public String toString() { - return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java deleted file mode 100644 index 6550940..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry.impl; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; -import org.apache.hadoop.service.AbstractService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapRegistryService extends AbstractService { - - private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); - - private ServiceRegistry registry = null; - private final boolean isDaemon; - - public LlapRegistryService(boolean isDaemon) { - super("LlapRegistryService"); - this.isDaemon = isDaemon; - } - - @Override - public void serviceInit(Configuration conf) { - String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - if (hosts.startsWith("@")) { - registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon); - } else { - registry = new LlapFixedRegistryImpl(hosts, conf); - } - LOG.info("Using LLAP registry type " + registry); - } - - - @Override - public void serviceStart() throws Exception { - if (this.registry != null) { - this.registry.start(); - } - if (isDaemon) { - registerWorker(); - } - } - - @Override - public void serviceStop() throws Exception { - if (isDaemon) { - unregisterWorker(); - } - if (this.registry != null) { - this.registry.stop(); - } else { - LOG.warn("Stopping non-existent registry service"); - } - } - - private void registerWorker() throws IOException { - if (this.registry != null) { - this.registry.register(); - } - } - - private void unregisterWorker() throws IOException { - if (this.registry != null) { - this.registry.unregister(); - } - } - - public ServiceInstanceSet getInstances() throws IOException { - return this.registry.getInstances("LLAP"); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java deleted file mode 100644 index 599da13..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.registry.impl; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; -import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; -import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; -import org.apache.hadoop.registry.client.types.AddressTypes; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.ProtocolTypes; -import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.zookeeper.CreateMode; - -import com.google.common.base.Preconditions; - -public class LlapYarnRegistryImpl implements ServiceRegistry { - - private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class); - - private final RegistryOperationsService client; - private final Configuration conf; - private final ServiceRecordMarshal encoder; - private final String path; - - private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); - - private static final UUID uniq = UUID.randomUUID(); - private static final String hostname; - - private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; - - private final static String SERVICE_CLASS = "org-apache-hive"; - - final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build()); - final long refreshDelay; - private final boolean isDaemon; - - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } - - public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) { - - LOG.info("Llap Registry is enabled with registryid: " + instanceName); - this.conf = new Configuration(conf); - conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - // registry reference - client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); - encoder = new RegistryUtils.ServiceRecordMarshal(); - this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); - refreshDelay = - conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, - LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT); - this.isDaemon = isDaemon; - Preconditions.checkArgument(refreshDelay > 0, - "Refresh delay for registry has to be positive = %d", refreshDelay); - } - - public Endpoint getRpcEndpoint() { - final int rpcPort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); - } - - public Endpoint getShuffleEndpoint() { - final int shufflePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); - // HTTP today, but might not be - return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, - shufflePort); - } - - public Endpoint getServicesEndpoint() { - final int servicePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT, - LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT); - final boolean isSSL = - conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL, - LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT); - final String scheme = isSSL ? "https" : "http"; - final URL serviceURL; - try { - serviceURL = new URL(scheme, hostname, servicePort, ""); - return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI()); - } catch (MalformedURLException e) { - throw new TezUncheckedException(e); - } catch (URISyntaxException e) { - throw new TezUncheckedException("llap service URI for " + hostname + " is invalid", e); - } - } - - private final String getPath() { - return this.path; - } - - @Override - public void register() throws IOException { - String path = getPath(); - ServiceRecord srv = new ServiceRecord(); - srv.addInternalEndpoint(getRpcEndpoint()); - srv.addInternalEndpoint(getShuffleEndpoint()); - srv.addExternalEndpoint(getServicesEndpoint()); - - for (Map.Entry<String, String> kv : this.conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) - || kv.getKey().startsWith("hive.llap.")) { - // TODO: read this somewhere useful, like the task scheduler - srv.set(kv.getKey(), kv.getValue()); - } - } - - // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); - - client.mknode(RegistryPathUtils.parentOf(path), true); - - // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths - client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), - client.getClientAcls()); - } - - @Override - public void unregister() throws IOException { - // Nothing for the zkCreate models - } - - private class DynamicServiceInstance implements ServiceInstance { - - private final ServiceRecord srv; - private boolean alive = true; - private final String host; - private final int rpcPort; - private final int shufflePort; - - public DynamicServiceInstance(ServiceRecord srv) throws IOException { - this.srv = srv; - - final Endpoint shuffle = srv.getInternalEndpoint("shuffle"); - final Endpoint rpc = srv.getInternalEndpoint("llap"); - - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = - Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); - this.shufflePort = - Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); - } - - @Override - public String getWorkerIdentity() { - return srv.get(UNIQUE_IDENTIFIER); - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getRpcPort() { - return rpcPort; - } - - @Override - public int getShufflePort() { - return shufflePort; - } - - @Override - public boolean isAlive() { - return alive ; - } - - public void kill() { - // May be possible to generate a notification back to the scheduler from here. - LOG.info("Killing service instance: " + this); - this.alive = false; - } - - @Override - public Map<String, String> getProperties() { - return srv.attributes(); - } - - @Override - public Resource getResource() { - int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); - return Resource.newInstance(memory, vCores); - } - - @Override - public String toString() { - return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]"; - } - - // Relying on the identity hashCode and equality, since refreshing instances retains the old copy - // of an already known instance. - } - - private class DynamicServiceInstanceSet implements ServiceInstanceSet { - - // LinkedHashMap to retain iteration order. - private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); - - @Override - public synchronized Map<String, ServiceInstance> getAll() { - // Return a copy. Instances may be modified during a refresh. - return new LinkedHashMap<>(instances); - } - - @Override - public synchronized ServiceInstance getInstance(String name) { - return instances.get(name); - } - - @Override - public void refresh() throws IOException { - /* call this from wherever */ - Map<String, ServiceInstance> freshInstances = new HashMap<String, ServiceInstance>(); - - String path = getPath(); - Map<String, ServiceRecord> records = - RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - // Synchronize after reading the service records from the external service (ZK) - synchronized (this) { - Set<String> latestKeys = new HashSet<String>(); - LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); - for (ServiceRecord rec : records.values()) { - ServiceInstance instance = new DynamicServiceInstance(rec); - if (instance != null) { - if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { - // add a new object - freshInstances.put(instance.getWorkerIdentity(), instance); - if (LOG.isInfoEnabled()) { - LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " - + instance); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + - " which mapped to " + instance); - } - } - } - latestKeys.add(instance.getWorkerIdentity()); - } - - if (instances != null) { - // deep-copy before modifying - Set<String> oldKeys = new HashSet<>(instances.keySet()); - if (oldKeys.removeAll(latestKeys)) { - // This is all the records which have not checked in, and are effectively dead. - for (String k : oldKeys) { - // this is so that people can hold onto ServiceInstance references as placeholders for tasks - final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); - dead.kill(); - if (LOG.isInfoEnabled()) { - LOG.info("Deleting dead worker " + k + " which mapped to " + dead); - } - } - } - // oldKeys contains the set of dead instances at this point. - this.instances.keySet().removeAll(oldKeys); - this.instances.putAll(freshInstances); - } else { - this.instances.putAll(freshInstances); - } - } - } - - @Override - public synchronized Set<ServiceInstance> getByHost(String host) { - // TODO Maybe store this as a map which is populated during construction, to avoid walking - // the map on each request. - Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); - - for (ServiceInstance i : instances.values()) { - if (host.equals(i.getHost())) { - // all hosts in instances should be alive in this impl - byHost.add(i); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + i.getHost()); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; - } - } - - @Override - public ServiceInstanceSet getInstances(String component) throws IOException { - Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component - if (this.client != null) { - instances.refresh(); - return instances; - } else { - Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); - return null; - } - } - - @Override - public void start() { - if (client == null) return; - client.start(); - if (isDaemon) return; - refresher.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - instances.refresh(); - } catch (IOException ioe) { - LOG.warn("Could not refresh hosts during scheduled refresh", ioe); - } - } - }, 0, refreshDelay, TimeUnit.SECONDS); - } - - @Override - public void stop() { - if (client != null) { - client.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 7fb9a99..6fd01f9 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -51,9 +51,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 2f93266..23724a4 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -34,7 +34,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapFixedRegistryImpl; +import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container;