Repository: hive
Updated Branches:
  refs/heads/master 8f881f87b -> 215c5770c


HIVE-12256. Move LLAP registry into llap-client module. (Siddharth Seth, 
reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/215c5770
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/215c5770
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/215c5770

Branch: refs/heads/master
Commit: 215c5770c7ef5e0d1428f9288354205eedcee399
Parents: 8f881f8
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Oct 29 21:56:38 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Oct 29 21:56:38 2015 -0700

----------------------------------------------------------------------
 llap-client/pom.xml                             |   6 +
 .../hive/llap/registry/ServiceInstance.java     |  73 ++++
 .../hive/llap/registry/ServiceInstanceSet.java  |  57 +++
 .../hive/llap/registry/ServiceRegistry.java     |  59 +++
 .../registry/impl/LlapFixedRegistryImpl.java    | 223 +++++++++++
 .../llap/registry/impl/LlapRegistryService.java |  87 +++++
 .../registry/impl/LlapYarnRegistryImpl.java     | 383 ++++++++++++++++++
 llap-server/pom.xml                             |   6 -
 .../hive/llap/daemon/impl/LlapDaemon.java       |   2 +-
 .../llap/daemon/registry/ServiceInstance.java   |  73 ----
 .../daemon/registry/ServiceInstanceSet.java     |  57 ---
 .../llap/daemon/registry/ServiceRegistry.java   |  59 ---
 .../registry/impl/LlapFixedRegistryImpl.java    | 223 -----------
 .../registry/impl/LlapRegistryService.java      |  87 -----
 .../registry/impl/LlapYarnRegistryImpl.java     | 384 -------------------
 .../dag/app/rm/LlapTaskSchedulerService.java    |   6 +-
 .../app/rm/TestLlapTaskSchedulerService.java    |   2 +-
 17 files changed, 893 insertions(+), 894 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 02243f8..0cd2ec9 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -72,6 +72,12 @@
       <version>${hadoop.version}</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-registry</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
new file mode 100644
index 0000000..f116de4
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface ServiceInstance {
+
+  /**
+   * Worker identity is a UUID (unique across restarts), to identify a node 
which died & was brought
+   * back on the same host/port
+   */
+  public String getWorkerIdentity();
+
+  /**
+   * Hostname of the service instance
+   * 
+   * @return
+   */
+  public String getHost();
+
+  /**
+   * RPC Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getRpcPort();
+
+  /**
+   * Shuffle Endpoint for service instance
+   * 
+   * @return
+   */
+  public int getShufflePort();
+
+  /**
+   * Return the last known state (without refreshing)
+   * 
+   * @return
+   */
+
+  public boolean isAlive();
+
+  /**
+   * Config properties of the Service Instance (llap.daemon.*)
+   * 
+   * @return
+   */
+
+  public Map<String, String> getProperties();
+
+  /**
+   * Memory and Executors available for the LLAP tasks
+   * 
+   * This does not include the size of the cache or the actual vCores 
allocated via Slider.
+   * 
+   * @return
+   */
+  public Resource getResource();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
new file mode 100644
index 0000000..388b5f3
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public interface ServiceInstanceSet {
+
+  /**
+   * Get an instance mapping which map worker identity to each instance.
+   * 
+   * The worker identity does not collide between restarts, so each restart 
will have a unique id,
+   * while having the same host/ip pair.
+   * 
+   * @return
+   */
+  public Map<String, ServiceInstance> getAll();
+
+  /**
+   * Get an instance by worker identity.
+   * 
+   * @param name
+   * @return
+   */
+  public ServiceInstance getInstance(String name);
+
+  /**
+   * Get a list of service instances for a given host.
+   * 
+   * The list could include dead and alive instances.
+   * 
+   * @param host
+   * @return
+   */
+  public Set<ServiceInstance> getByHost(String host);
+
+  /**
+   * Refresh the instance set from registry backing store.
+   * 
+   * @throws IOException
+   */
+  public void refresh() throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
new file mode 100644
index 0000000..d3fb517
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.io.IOException;
+
+/**
+ * ServiceRegistry interface for switching between fixed host and dynamic 
registry implementations.
+ */
+public interface ServiceRegistry {
+
+  /**
+   * Start the service registry
+   * 
+   * @throws InterruptedException
+   */
+  public void start() throws InterruptedException;
+
+  /**
+   * Stop the service registry
+   * 
+   * @throws InterruptedException
+   */
+  public void stop() throws InterruptedException;
+
+  /**
+   * Register the current instance - the implementation takes care of the 
endpoints to register.
+   * 
+   * @throws IOException
+   */
+  public void register() throws IOException;
+
+  /**
+   * Remove the current registration cleanly (implementation defined cleanup)
+   * 
+   * @throws IOException
+   */
+  public void unregister() throws IOException;
+
+  /**
+   * Client API to get the list of instances registered via the current 
registry key.
+   * 
+   * @param component
+   * @return
+   * @throws IOException
+   */
+  public ServiceInstanceSet getInstances(String component) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
new file mode 100644
index 0000000..34e0682
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapFixedRegistryImpl implements ServiceRegistry {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapFixedRegistryImpl.class);
+
+  @InterfaceAudience.Private
+  // This is primarily for testing to avoid the host lookup
+  public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = 
"fixed.registry.resolve.host.names";
+
+  private final int port;
+  private final int shuffle;
+  private final String[] hosts;
+  private final int memory;
+  private final int vcores;
+  private final boolean resolveHosts;
+
+  private final Map<String, String> srv = new HashMap<String, String>();
+
+  public LlapFixedRegistryImpl(String hosts, Configuration conf) {
+    this.hosts = hosts.split(",");
+    this.port =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+    this.shuffle =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+    this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
true);
+
+    for (Map.Entry<String, String> kv : conf) {
+      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+          || kv.getKey().startsWith("hive.llap.")
+          || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) {
+        // TODO: read this somewhere useful, like the task scheduler
+        srv.put(kv.getKey(), kv.getValue());
+      }
+    }
+
+    this.memory =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+            LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
+    this.vcores =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+            LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+  }
+
+  @Override
+  public void start() throws InterruptedException {
+    // nothing to start
+  }
+
+  @Override
+  public void stop() throws InterruptedException {
+    // nothing to stop
+  }
+
+  @Override
+  public void register() throws IOException {
+    // nothing to register
+  }
+
+  @Override
+  public void unregister() throws IOException {
+    // nothing to unregister
+  }
+
+  public static String getWorkerIdentity(String host) {
+    // trigger clean errors for anyone who mixes up identity with hosts
+    return "host-" + host;
+  }
+
+  private final class FixedServiceInstance implements ServiceInstance {
+
+    private final String host;
+
+    public FixedServiceInstance(String host) {
+      if (resolveHosts) {
+        try {
+          InetAddress inetAddress = InetAddress.getByName(host);
+          if (NetUtils.isLocalAddress(inetAddress)) {
+            InetSocketAddress socketAddress = new InetSocketAddress(0);
+            socketAddress = NetUtils.getConnectAddress(socketAddress);
+            LOG.info("Adding host identified as local: " + host + " as "
+                + socketAddress.getHostName());
+            host = socketAddress.getHostName();
+          }
+        } catch (UnknownHostException e) {
+          LOG.warn("Ignoring resolution issues for host: " + host, e);
+        }
+      }
+      this.host = host;
+    }
+
+    @Override
+    public String getWorkerIdentity() {
+      return LlapFixedRegistryImpl.getWorkerIdentity(host);
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getRpcPort() {
+      // TODO: allow >1 port per host?
+      return LlapFixedRegistryImpl.this.port;
+    }
+
+    @Override
+    public int getShufflePort() {
+      return LlapFixedRegistryImpl.this.shuffle;
+    }
+
+    @Override
+    public boolean isAlive() {
+      return true;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      Map<String, String> properties = new HashMap<>(srv);
+      // no worker identity
+      return properties;
+    }
+
+    @Override
+    public Resource getResource() {
+      return Resource.newInstance(memory, vcores);
+    }
+
+    @Override
+    public String toString() {
+      return "FixedServiceInstance{" +
+          "host=" + host +
+          ", memory=" + memory +
+          ", vcores=" + vcores +
+          '}';
+    }
+  }
+
+  private final class FixedServiceInstanceSet implements ServiceInstanceSet {
+
+    private final Map<String, ServiceInstance> instances = new HashMap<String, 
ServiceInstance>();
+
+    public FixedServiceInstanceSet() {
+      for (String host : hosts) {
+        // trigger bugs in anyone who uses this as a hostname
+        instances.put(getWorkerIdentity(host), new FixedServiceInstance(host));
+      }
+    }
+
+    @Override
+    public Map<String, ServiceInstance> getAll() {
+      return instances;
+    }
+
+    @Override
+    public ServiceInstance getInstance(String name) {
+      return instances.get(name);
+    }
+
+    @Override
+    public Set<ServiceInstance> getByHost(String host) {
+      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
+      ServiceInstance inst = getInstance(getWorkerIdentity(host));
+      if (inst != null) {
+        byHost.add(inst);
+      }
+      return byHost;
+    }
+
+    @Override
+    public void refresh() throws IOException {
+      // I will do no such thing
+    }
+
+  }
+
+  @Override
+  public ServiceInstanceSet getInstances(String component) throws IOException {
+    return new FixedServiceInstanceSet();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FixedRegistry hosts=%s", StringUtils.join(",", 
this.hosts));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
new file mode 100644
index 0000000..a8e1465
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapRegistryService extends AbstractService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapRegistryService.class);
+
+  private ServiceRegistry registry = null;
+  private final boolean isDaemon;
+
+  public LlapRegistryService(boolean isDaemon) {
+    super("LlapRegistryService");
+    this.isDaemon = isDaemon;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    String hosts = 
conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+    if (hosts.startsWith("@")) {
+      registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
+    } else {
+      registry = new LlapFixedRegistryImpl(hosts, conf);
+    }
+    LOG.info("Using LLAP registry type " + registry);
+  }
+
+
+  @Override
+  public void serviceStart() throws Exception {
+    if (this.registry != null) {
+      this.registry.start();
+    }
+    if (isDaemon) {
+      registerWorker();
+    }
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    if (isDaemon) {
+      unregisterWorker();
+    }
+    if (this.registry != null) {
+      this.registry.stop();
+    } else {
+      LOG.warn("Stopping non-existent registry service");
+    }
+  }
+
+  private void registerWorker() throws IOException {
+    if (this.registry != null) {
+      this.registry.register();
+    }
+  }
+
+  private void unregisterWorker() throws IOException {
+    if (this.registry != null) {
+      this.registry.unregister();
+    }
+  }
+
+  public ServiceInstanceSet getInstances() throws IOException {
+    return this.registry.getInstances("LLAP");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
new file mode 100644
index 0000000..d474b6f
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class LlapYarnRegistryImpl implements ServiceRegistry {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
+
+  private final RegistryOperationsService client;
+  private final Configuration conf;
+  private final ServiceRecordMarshal encoder;
+  private final String path;
+
+  private final DynamicServiceInstanceSet instances = new 
DynamicServiceInstanceSet();
+
+  private static final UUID uniq = UUID.randomUUID();
+  private static final String hostname;
+
+  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
+
+  private final static String SERVICE_CLASS = "org-apache-hive";
+
+  final ScheduledExecutorService refresher = 
Executors.newScheduledThreadPool(1,
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
+  final long refreshDelay;
+  private final boolean isDaemon;
+
+  static {
+    String localhost = "localhost";
+    try {
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException uhe) {
+      // ignore
+    }
+    hostname = localhost;
+  }
+
+  public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean 
isDaemon) {
+
+    LOG.info("Llap Registry is enabled with registryid: " + instanceName);
+    this.conf = new Configuration(conf);
+    conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    // registry reference
+    client = (RegistryOperationsService) 
RegistryOperationsFactory.createInstance(conf);
+    encoder = new RegistryUtils.ServiceRecordMarshal();
+    this.path = 
RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
+        SERVICE_CLASS, instanceName, "workers"), "worker-");
+    refreshDelay =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
+            LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
+    this.isDaemon = isDaemon;
+    Preconditions.checkArgument(refreshDelay > 0,
+        "Refresh delay for registry has to be positive = %d", refreshDelay);
+  }
+
+  public Endpoint getRpcEndpoint() {
+    final int rpcPort =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+    return RegistryTypeUtils.ipcEndpoint("llap", new 
InetSocketAddress(hostname, rpcPort));
+  }
+
+  public Endpoint getShuffleEndpoint() {
+    final int shufflePort =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+    // HTTP today, but might not be
+    return RegistryTypeUtils.inetAddrEndpoint("shuffle", 
ProtocolTypes.PROTOCOL_TCP, hostname,
+        shufflePort);
+  }
+
+  public Endpoint getServicesEndpoint() {
+    final int servicePort =
+        conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
+            LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
+    final boolean isSSL =
+        conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
+            LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+    final String scheme = isSSL ? "https" : "http";
+    final URL serviceURL;
+    try {
+      serviceURL = new URL(scheme, hostname, servicePort, "");
+      return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("llap service URI for " + hostname + " is 
invalid", e);
+    }
+  }
+
+  private final String getPath() {
+    return this.path;
+  }
+
+  @Override
+  public void register() throws IOException {
+    String path = getPath();
+    ServiceRecord srv = new ServiceRecord();
+    srv.addInternalEndpoint(getRpcEndpoint());
+    srv.addInternalEndpoint(getShuffleEndpoint());
+    srv.addExternalEndpoint(getServicesEndpoint());
+
+    for (Map.Entry<String, String> kv : this.conf) {
+      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+          || kv.getKey().startsWith("hive.llap.")) {
+        // TODO: read this somewhere useful, like the task scheduler
+        srv.set(kv.getKey(), kv.getValue());
+      }
+    }
+
+    // restart sensitive instance id
+    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+
+    client.mknode(RegistryPathUtils.parentOf(path), true);
+
+    // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the 
paths
+    client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, 
encoder.toBytes(srv),
+        client.getClientAcls());
+  }
+
+  @Override
+  public void unregister() throws IOException {
+   // Nothing for the zkCreate models
+  }
+
+  private class DynamicServiceInstance implements ServiceInstance {
+
+    private final ServiceRecord srv;
+    private boolean alive = true;
+    private final String host;
+    private final int rpcPort;
+    private final int shufflePort;
+
+    public DynamicServiceInstance(ServiceRecord srv) throws IOException {
+      this.srv = srv;
+
+      final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
+      final Endpoint rpc = srv.getInternalEndpoint("llap");
+
+      this.host =
+          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_HOSTNAME_FIELD);
+      this.rpcPort =
+          
Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+      this.shufflePort =
+          
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
+    }
+
+    @Override
+    public String getWorkerIdentity() {
+      return srv.get(UNIQUE_IDENTIFIER);
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getRpcPort() {
+      return rpcPort;
+    }
+
+    @Override
+    public int getShufflePort() {
+      return shufflePort;
+    }
+
+    @Override
+    public boolean isAlive() {
+      return alive ;
+    }
+
+    public void kill() {
+      // May be possible to generate a notification back to the scheduler from 
here.
+      LOG.info("Killing service instance: " + this);
+      this.alive = false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      return srv.attributes();
+    }
+
+    @Override
+    public Resource getResource() {
+      int memory = 
Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+      int vCores = 
Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
+      return Resource.newInstance(memory, vCores);
+    }
+
+    @Override
+    public String toString() {
+      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort + " with resources=" + getResource() +"]";
+    }
+
+    // Relying on the identity hashCode and equality, since refreshing 
instances retains the old copy
+    // of an already known instance.
+  }
+
+  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
+
+    // LinkedHashMap to retain iteration order.
+    private final Map<String, ServiceInstance> instances = new 
LinkedHashMap<>();
+
+    @Override
+    public synchronized Map<String, ServiceInstance> getAll() {
+      // Return a copy. Instances may be modified during a refresh.
+      return new LinkedHashMap<>(instances);
+    }
+
+    @Override
+    public synchronized ServiceInstance getInstance(String name) {
+      return instances.get(name);
+    }
+
+    @Override
+    public  void refresh() throws IOException {
+      /* call this from wherever */
+      Map<String, ServiceInstance> freshInstances = new HashMap<String, 
ServiceInstance>();
+
+      String path = getPath();
+      Map<String, ServiceRecord> records =
+          RegistryUtils.listServiceRecords(client, 
RegistryPathUtils.parentOf(path));
+      // Synchronize after reading the service records from the external 
service (ZK)
+      synchronized (this) {
+        Set<String> latestKeys = new HashSet<String>();
+        LOG.info("Starting to refresh ServiceInstanceSet " + 
System.identityHashCode(this));
+        for (ServiceRecord rec : records.values()) {
+          ServiceInstance instance = new DynamicServiceInstance(rec);
+          if (instance != null) {
+            if (instances != null && 
instances.containsKey(instance.getWorkerIdentity()) == false) {
+              // add a new object
+              freshInstances.put(instance.getWorkerIdentity(), instance);
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Adding new worker " + instance.getWorkerIdentity() + 
" which mapped to "
+                    + instance);
+              }
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Retaining running worker " + 
instance.getWorkerIdentity() +
+                    " which mapped to " + instance);
+              }
+            }
+          }
+          latestKeys.add(instance.getWorkerIdentity());
+        }
+
+        if (instances != null) {
+          // deep-copy before modifying
+          Set<String> oldKeys = new HashSet<>(instances.keySet());
+          if (oldKeys.removeAll(latestKeys)) {
+            // This is all the records which have not checked in, and are 
effectively dead.
+            for (String k : oldKeys) {
+              // this is so that people can hold onto ServiceInstance 
references as placeholders for tasks
+              final DynamicServiceInstance dead = (DynamicServiceInstance) 
instances.get(k);
+              dead.kill();
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Deleting dead worker " + k + " which mapped to " + 
dead);
+              }
+            }
+          }
+          // oldKeys contains the set of dead instances at this point.
+          this.instances.keySet().removeAll(oldKeys);
+          this.instances.putAll(freshInstances);
+        } else {
+          this.instances.putAll(freshInstances);
+        }
+      }
+    }
+
+    @Override
+    public synchronized Set<ServiceInstance> getByHost(String host) {
+      // TODO Maybe store this as a map which is populated during 
construction, to avoid walking
+      // the map on each request.
+      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
+
+      for (ServiceInstance i : instances.values()) {
+        if (host.equals(i.getHost())) {
+          // all hosts in instances should be alive in this impl
+          byHost.add(i);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Locality comparing " + host + " to " + i.getHost());
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Returning " + byHost.size() + " hosts for locality 
allocation on " + host);
+      }
+      return byHost;
+    }
+  }
+
+  @Override
+  public ServiceInstanceSet getInstances(String component) throws IOException {
+    Preconditions.checkArgument("LLAP".equals(component)); // right now there 
is only 1 component 
+    if (this.client != null) {
+      instances.refresh();
+      return instances;
+    } else {
+      Preconditions.checkNotNull(this.client, "Yarn registry client is not 
intialized");
+      return null;
+    }
+  }
+
+  @Override
+  public void start() {
+    if (client == null) return;
+    client.start();
+    if (isDaemon) return;
+    refresher.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          instances.refresh();
+        } catch (IOException ioe) {
+          LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
+        }
+      }
+    }, 0, refreshDelay, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void stop() {
+    if (client != null) {
+      client.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/pom.xml
----------------------------------------------------------------------
diff --git a/llap-server/pom.xml b/llap-server/pom.xml
index 4be45a5..a733e2c 100644
--- a/llap-server/pom.xml
+++ b/llap-server/pom.xml
@@ -98,12 +98,6 @@
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-registry</artifactId>
-      <version>${hadoop.version}</version>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-runtime-internals</artifactId>
       <version>${tez.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 6f75001..98b1ccd 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
deleted file mode 100644
index f0f22aa..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public interface ServiceInstance {
-
-  /**
-   * Worker identity is a UUID (unique across restarts), to identify a node 
which died & was brought
-   * back on the same host/port
-   */
-  public String getWorkerIdentity();
-
-  /**
-   * Hostname of the service instance
-   * 
-   * @return
-   */
-  public String getHost();
-
-  /**
-   * RPC Endpoint for service instance
-   * 
-   * @return
-   */
-  public int getRpcPort();
-
-  /**
-   * Shuffle Endpoint for service instance
-   * 
-   * @return
-   */
-  public int getShufflePort();
-
-  /**
-   * Return the last known state (without refreshing)
-   * 
-   * @return
-   */
-
-  public boolean isAlive();
-
-  /**
-   * Config properties of the Service Instance (llap.daemon.*)
-   * 
-   * @return
-   */
-
-  public Map<String, String> getProperties();
-
-  /**
-   * Memory and Executors available for the LLAP tasks
-   * 
-   * This does not include the size of the cache or the actual vCores 
allocated via Slider.
-   * 
-   * @return
-   */
-  public Resource getResource();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
deleted file mode 100644
index 7ab36d4..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-public interface ServiceInstanceSet {
-
-  /**
-   * Get an instance mapping which map worker identity to each instance.
-   * 
-   * The worker identity does not collide between restarts, so each restart 
will have a unique id,
-   * while having the same host/ip pair.
-   * 
-   * @return
-   */
-  public Map<String, ServiceInstance> getAll();
-
-  /**
-   * Get an instance by worker identity.
-   * 
-   * @param name
-   * @return
-   */
-  public ServiceInstance getInstance(String name);
-
-  /**
-   * Get a list of service instances for a given host.
-   * 
-   * The list could include dead and alive instances.
-   * 
-   * @param host
-   * @return
-   */
-  public Set<ServiceInstance> getByHost(String host);
-
-  /**
-   * Refresh the instance set from registry backing store.
-   * 
-   * @throws IOException
-   */
-  public void refresh() throws IOException;
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
deleted file mode 100644
index a0f9aac..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.io.IOException;
-
-/**
- * ServiceRegistry interface for switching between fixed host and dynamic 
registry implementations.
- */
-public interface ServiceRegistry {
-
-  /**
-   * Start the service registry
-   * 
-   * @throws InterruptedException
-   */
-  public void start() throws InterruptedException;
-
-  /**
-   * Stop the service registry
-   * 
-   * @throws InterruptedException
-   */
-  public void stop() throws InterruptedException;
-
-  /**
-   * Register the current instance - the implementation takes care of the 
endpoints to register.
-   * 
-   * @throws IOException
-   */
-  public void register() throws IOException;
-
-  /**
-   * Remove the current registration cleanly (implementation defined cleanup)
-   * 
-   * @throws IOException
-   */
-  public void unregister() throws IOException;
-
-  /**
-   * Client API to get the list of instances registered via the current 
registry key.
-   * 
-   * @param component
-   * @return
-   * @throws IOException
-   */
-  public ServiceInstanceSet getInstances(String component) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
deleted file mode 100644
index 621a6a6..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapFixedRegistryImpl implements ServiceRegistry {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapFixedRegistryImpl.class);
-
-  @InterfaceAudience.Private
-  // This is primarily for testing to avoid the host lookup
-  public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = 
"fixed.registry.resolve.host.names";
-
-  private final int port;
-  private final int shuffle;
-  private final String[] hosts;
-  private final int memory;
-  private final int vcores;
-  private final boolean resolveHosts;
-
-  private final Map<String, String> srv = new HashMap<String, String>();
-
-  public LlapFixedRegistryImpl(String hosts, Configuration conf) {
-    this.hosts = hosts.split(",");
-    this.port =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
-            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-    this.shuffle =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
-            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
-    this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
true);
-
-    for (Map.Entry<String, String> kv : conf) {
-      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
-          || kv.getKey().startsWith("hive.llap.")
-          || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) {
-        // TODO: read this somewhere useful, like the task scheduler
-        srv.put(kv.getKey(), kv.getValue());
-      }
-    }
-
-    this.memory =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
-            LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
-    this.vcores =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
-            LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
-  }
-
-  @Override
-  public void start() throws InterruptedException {
-    // nothing to start
-  }
-
-  @Override
-  public void stop() throws InterruptedException {
-    // nothing to stop
-  }
-
-  @Override
-  public void register() throws IOException {
-    // nothing to register
-  }
-
-  @Override
-  public void unregister() throws IOException {
-    // nothing to unregister
-  }
-
-  public static String getWorkerIdentity(String host) {
-    // trigger clean errors for anyone who mixes up identity with hosts
-    return "host-" + host;
-  }
-
-  private final class FixedServiceInstance implements ServiceInstance {
-
-    private final String host;
-
-    public FixedServiceInstance(String host) {
-      if (resolveHosts) {
-        try {
-          InetAddress inetAddress = InetAddress.getByName(host);
-          if (NetUtils.isLocalAddress(inetAddress)) {
-            InetSocketAddress socketAddress = new InetSocketAddress(0);
-            socketAddress = NetUtils.getConnectAddress(socketAddress);
-            LOG.info("Adding host identified as local: " + host + " as "
-                + socketAddress.getHostName());
-            host = socketAddress.getHostName();
-          }
-        } catch (UnknownHostException e) {
-          LOG.warn("Ignoring resolution issues for host: " + host, e);
-        }
-      }
-      this.host = host;
-    }
-
-    @Override
-    public String getWorkerIdentity() {
-      return LlapFixedRegistryImpl.getWorkerIdentity(host);
-    }
-
-    @Override
-    public String getHost() {
-      return host;
-    }
-
-    @Override
-    public int getRpcPort() {
-      // TODO: allow >1 port per host?
-      return LlapFixedRegistryImpl.this.port;
-    }
-
-    @Override
-    public int getShufflePort() {
-      return LlapFixedRegistryImpl.this.shuffle;
-    }
-
-    @Override
-    public boolean isAlive() {
-      return true;
-    }
-
-    @Override
-    public Map<String, String> getProperties() {
-      Map<String, String> properties = new HashMap<>(srv);
-      // no worker identity
-      return properties;
-    }
-
-    @Override
-    public Resource getResource() {
-      return Resource.newInstance(memory, vcores);
-    }
-
-    @Override
-    public String toString() {
-      return "FixedServiceInstance{" +
-          "host=" + host +
-          ", memory=" + memory +
-          ", vcores=" + vcores +
-          '}';
-    }
-  }
-
-  private final class FixedServiceInstanceSet implements ServiceInstanceSet {
-
-    private final Map<String, ServiceInstance> instances = new HashMap<String, 
ServiceInstance>();
-
-    public FixedServiceInstanceSet() {
-      for (String host : hosts) {
-        // trigger bugs in anyone who uses this as a hostname
-        instances.put(getWorkerIdentity(host), new FixedServiceInstance(host));
-      }
-    }
-
-    @Override
-    public Map<String, ServiceInstance> getAll() {
-      return instances;
-    }
-
-    @Override
-    public ServiceInstance getInstance(String name) {
-      return instances.get(name);
-    }
-
-    @Override
-    public Set<ServiceInstance> getByHost(String host) {
-      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-      ServiceInstance inst = getInstance(getWorkerIdentity(host));
-      if (inst != null) {
-        byHost.add(inst);
-      }
-      return byHost;
-    }
-
-    @Override
-    public void refresh() throws IOException {
-      // I will do no such thing
-    }
-
-  }
-
-  @Override
-  public ServiceInstanceSet getInstances(String component) throws IOException {
-    return new FixedServiceInstanceSet();
-  }
-
-  @Override
-  public String toString() {
-    return String.format("FixedRegistry hosts=%s", StringUtils.join(",", 
this.hosts));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
deleted file mode 100644
index 6550940..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.service.AbstractService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapRegistryService extends AbstractService {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapRegistryService.class);
-
-  private ServiceRegistry registry = null;
-  private final boolean isDaemon;
-
-  public LlapRegistryService(boolean isDaemon) {
-    super("LlapRegistryService");
-    this.isDaemon = isDaemon;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) {
-    String hosts = 
conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
-    if (hosts.startsWith("@")) {
-      registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
-    } else {
-      registry = new LlapFixedRegistryImpl(hosts, conf);
-    }
-    LOG.info("Using LLAP registry type " + registry);
-  }
-
-
-  @Override
-  public void serviceStart() throws Exception {
-    if (this.registry != null) {
-      this.registry.start();
-    }
-    if (isDaemon) {
-      registerWorker();
-    }
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    if (isDaemon) {
-      unregisterWorker();
-    }
-    if (this.registry != null) {
-      this.registry.stop();
-    } else {
-      LOG.warn("Stopping non-existent registry service");
-    }
-  }
-
-  private void registerWorker() throws IOException {
-    if (this.registry != null) {
-      this.registry.register();
-    }
-  }
-
-  private void unregisterWorker() throws IOException {
-    if (this.registry != null) {
-      this.registry.unregister();
-    }
-  }
-
-  public ServiceInstanceSet getInstances() throws IOException {
-    return this.registry.getInstances("LLAP");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
deleted file mode 100644
index 599da13..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
-import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
-import org.apache.hadoop.registry.client.types.AddressTypes;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ProtocolTypes;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.zookeeper.CreateMode;
-
-import com.google.common.base.Preconditions;
-
-public class LlapYarnRegistryImpl implements ServiceRegistry {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
-
-  private final RegistryOperationsService client;
-  private final Configuration conf;
-  private final ServiceRecordMarshal encoder;
-  private final String path;
-
-  private final DynamicServiceInstanceSet instances = new 
DynamicServiceInstanceSet();
-
-  private static final UUID uniq = UUID.randomUUID();
-  private static final String hostname;
-
-  private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
-
-  private final static String SERVICE_CLASS = "org-apache-hive";
-
-  final ScheduledExecutorService refresher = 
Executors.newScheduledThreadPool(1,
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
-  final long refreshDelay;
-  private final boolean isDaemon;
-
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
-
-  public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean 
isDaemon) {
-
-    LOG.info("Llap Registry is enabled with registryid: " + instanceName);
-    this.conf = new Configuration(conf);
-    conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    // registry reference
-    client = (RegistryOperationsService) 
RegistryOperationsFactory.createInstance(conf);
-    encoder = new RegistryUtils.ServiceRecordMarshal();
-    this.path = 
RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
-        SERVICE_CLASS, instanceName, "workers"), "worker-");
-    refreshDelay =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
-            LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
-    this.isDaemon = isDaemon;
-    Preconditions.checkArgument(refreshDelay > 0,
-        "Refresh delay for registry has to be positive = %d", refreshDelay);
-  }
-
-  public Endpoint getRpcEndpoint() {
-    final int rpcPort =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
-            LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-    return RegistryTypeUtils.ipcEndpoint("llap", new 
InetSocketAddress(hostname, rpcPort));
-  }
-
-  public Endpoint getShuffleEndpoint() {
-    final int shufflePort =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
-            LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
-    // HTTP today, but might not be
-    return RegistryTypeUtils.inetAddrEndpoint("shuffle", 
ProtocolTypes.PROTOCOL_TCP, hostname,
-        shufflePort);
-  }
-
-  public Endpoint getServicesEndpoint() {
-    final int servicePort =
-        conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
-            LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
-    final boolean isSSL =
-        conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
-            LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
-    final String scheme = isSSL ? "https" : "http";
-    final URL serviceURL;
-    try {
-      serviceURL = new URL(scheme, hostname, servicePort, "");
-      return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
-    } catch (MalformedURLException e) {
-      throw new TezUncheckedException(e);
-    } catch (URISyntaxException e) {
-      throw new TezUncheckedException("llap service URI for " + hostname + " 
is invalid", e);
-    }
-  }
-
-  private final String getPath() {
-    return this.path;
-  }
-
-  @Override
-  public void register() throws IOException {
-    String path = getPath();
-    ServiceRecord srv = new ServiceRecord();
-    srv.addInternalEndpoint(getRpcEndpoint());
-    srv.addInternalEndpoint(getShuffleEndpoint());
-    srv.addExternalEndpoint(getServicesEndpoint());
-
-    for (Map.Entry<String, String> kv : this.conf) {
-      if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
-          || kv.getKey().startsWith("hive.llap.")) {
-        // TODO: read this somewhere useful, like the task scheduler
-        srv.set(kv.getKey(), kv.getValue());
-      }
-    }
-
-    // restart sensitive instance id
-    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
-
-    client.mknode(RegistryPathUtils.parentOf(path), true);
-
-    // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the 
paths
-    client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, 
encoder.toBytes(srv),
-        client.getClientAcls());
-  }
-
-  @Override
-  public void unregister() throws IOException {
-   // Nothing for the zkCreate models
-  }
-
-  private class DynamicServiceInstance implements ServiceInstance {
-
-    private final ServiceRecord srv;
-    private boolean alive = true;
-    private final String host;
-    private final int rpcPort;
-    private final int shufflePort;
-
-    public DynamicServiceInstance(ServiceRecord srv) throws IOException {
-      this.srv = srv;
-
-      final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
-      final Endpoint rpc = srv.getInternalEndpoint("llap");
-
-      this.host =
-          RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_HOSTNAME_FIELD);
-      this.rpcPort =
-          
Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
-      this.shufflePort =
-          
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
-              AddressTypes.ADDRESS_PORT_FIELD));
-    }
-
-    @Override
-    public String getWorkerIdentity() {
-      return srv.get(UNIQUE_IDENTIFIER);
-    }
-
-    @Override
-    public String getHost() {
-      return host;
-    }
-
-    @Override
-    public int getRpcPort() {
-      return rpcPort;
-    }
-
-    @Override
-    public int getShufflePort() {
-      return shufflePort;
-    }
-
-    @Override
-    public boolean isAlive() {
-      return alive ;
-    }
-
-    public void kill() {
-      // May be possible to generate a notification back to the scheduler from 
here.
-      LOG.info("Killing service instance: " + this);
-      this.alive = false;
-    }
-
-    @Override
-    public Map<String, String> getProperties() {
-      return srv.attributes();
-    }
-
-    @Override
-    public Resource getResource() {
-      int memory = 
Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
-      int vCores = 
Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
-      return Resource.newInstance(memory, vCores);
-    }
-
-    @Override
-    public String toString() {
-      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort + " with resources=" + getResource() +"]";
-    }
-
-    // Relying on the identity hashCode and equality, since refreshing 
instances retains the old copy
-    // of an already known instance.
-  }
-
-  private class DynamicServiceInstanceSet implements ServiceInstanceSet {
-
-    // LinkedHashMap to retain iteration order.
-    private final Map<String, ServiceInstance> instances = new 
LinkedHashMap<>();
-
-    @Override
-    public synchronized Map<String, ServiceInstance> getAll() {
-      // Return a copy. Instances may be modified during a refresh.
-      return new LinkedHashMap<>(instances);
-    }
-
-    @Override
-    public synchronized ServiceInstance getInstance(String name) {
-      return instances.get(name);
-    }
-
-    @Override
-    public  void refresh() throws IOException {
-      /* call this from wherever */
-      Map<String, ServiceInstance> freshInstances = new HashMap<String, 
ServiceInstance>();
-
-      String path = getPath();
-      Map<String, ServiceRecord> records =
-          RegistryUtils.listServiceRecords(client, 
RegistryPathUtils.parentOf(path));
-      // Synchronize after reading the service records from the external 
service (ZK)
-      synchronized (this) {
-        Set<String> latestKeys = new HashSet<String>();
-        LOG.info("Starting to refresh ServiceInstanceSet " + 
System.identityHashCode(this));
-        for (ServiceRecord rec : records.values()) {
-          ServiceInstance instance = new DynamicServiceInstance(rec);
-          if (instance != null) {
-            if (instances != null && 
instances.containsKey(instance.getWorkerIdentity()) == false) {
-              // add a new object
-              freshInstances.put(instance.getWorkerIdentity(), instance);
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Adding new worker " + instance.getWorkerIdentity() + 
" which mapped to "
-                    + instance);
-              }
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Retaining running worker " + 
instance.getWorkerIdentity() +
-                    " which mapped to " + instance);
-              }
-            }
-          }
-          latestKeys.add(instance.getWorkerIdentity());
-        }
-
-        if (instances != null) {
-          // deep-copy before modifying
-          Set<String> oldKeys = new HashSet<>(instances.keySet());
-          if (oldKeys.removeAll(latestKeys)) {
-            // This is all the records which have not checked in, and are 
effectively dead.
-            for (String k : oldKeys) {
-              // this is so that people can hold onto ServiceInstance 
references as placeholders for tasks
-              final DynamicServiceInstance dead = (DynamicServiceInstance) 
instances.get(k);
-              dead.kill();
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Deleting dead worker " + k + " which mapped to " + 
dead);
-              }
-            }
-          }
-          // oldKeys contains the set of dead instances at this point.
-          this.instances.keySet().removeAll(oldKeys);
-          this.instances.putAll(freshInstances);
-        } else {
-          this.instances.putAll(freshInstances);
-        }
-      }
-    }
-
-    @Override
-    public synchronized Set<ServiceInstance> getByHost(String host) {
-      // TODO Maybe store this as a map which is populated during 
construction, to avoid walking
-      // the map on each request.
-      Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
-
-      for (ServiceInstance i : instances.values()) {
-        if (host.equals(i.getHost())) {
-          // all hosts in instances should be alive in this impl
-          byHost.add(i);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Locality comparing " + host + " to " + i.getHost());
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Returning " + byHost.size() + " hosts for locality 
allocation on " + host);
-      }
-      return byHost;
-    }
-  }
-
-  @Override
-  public ServiceInstanceSet getInstances(String component) throws IOException {
-    Preconditions.checkArgument("LLAP".equals(component)); // right now there 
is only 1 component 
-    if (this.client != null) {
-      instances.refresh();
-      return instances;
-    } else {
-      Preconditions.checkNotNull(this.client, "Yarn registry client is not 
intialized");
-      return null;
-    }
-  }
-
-  @Override
-  public void start() {
-    if (client == null) return;
-    client.start();
-    if (isDaemon) return;
-    refresher.scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          instances.refresh();
-        } catch (IOException ioe) {
-          LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
-        }
-      }
-    }, 0, refreshDelay, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public void stop() {
-    if (client != null) {
-      client.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 7fb9a99..6fd01f9 100644
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -51,9 +51,9 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;

http://git-wip-us.apache.org/repos/asf/hive/blob/215c5770/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 2f93266..23724a4 100644
--- 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapFixedRegistryImpl;
+import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;

Reply via email to