http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
deleted file mode 100644
index 6beb4f8..0000000
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ /dev/null
@@ -1,1512 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Clock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapTaskSchedulerService extends TaskScheduler {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskSchedulerService.class);
-
-  private final Configuration conf;
-
-  // interface into the registry service
-  private ServiceInstanceSet activeInstances;
-
-  // Tracks all instances, including ones which have been disabled in the past.
-  // LinkedHashMap to provide the same iteration order when selecting a random 
host.
-  @VisibleForTesting
-  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new 
LinkedHashMap<>();
-  // TODO Ideally, remove elements from this once it's known that no tasks are 
linked to the instance (all deallocated)
-
-  // Tracks tasks which could not be allocated immediately.
-  @VisibleForTesting
-  final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new 
Comparator<Priority>() {
-    @Override
-    public int compare(Priority o1, Priority o2) {
-      return o1.getPriority() - o2.getPriority();
-    }
-  });
-
-  // Tracks running and queued tasks. Cleared after a task completes.
-  private final ConcurrentMap<Object, TaskInfo> knownTasks = new 
ConcurrentHashMap<>();
-  private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new 
TreeMap<>();
-  private static final TaskStartComparator TASK_INFO_COMPARATOR = new 
TaskStartComparator();
-
-  // Queue for disabled nodes. Nodes make it out of this queue when their 
expiration timeout is hit.
-  @VisibleForTesting
-  final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
-
-  private final boolean forceLocation;
-
-  private final ContainerFactory containerFactory;
-  private final Random random = new Random();
-  private final Clock clock;
-
-  private final ListeningExecutorService nodeEnabledExecutor;
-  private final NodeEnablerCallable nodeEnablerCallable =
-      new NodeEnablerCallable();
-
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-  private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
-
-  private final Lock scheduleLock = new ReentrantLock();
-  private final Condition scheduleCondition = scheduleLock.newCondition();
-  private final AtomicBoolean pendingScheduleInvodations = new 
AtomicBoolean(false);
-  private final ListeningExecutorService schedulerExecutor;
-  private final SchedulerCallable schedulerCallable = new SchedulerCallable();
-
-  private final AtomicBoolean isStopped = new AtomicBoolean(false);
-  // Tracks total pending preemptions.
-  private final AtomicInteger pendingPreemptions = new AtomicInteger(0);
-  // Tracks pending preemptions per host, using the hostname || Always to be 
accessed inside a lock
-  private final Map<String, MutableInt> pendingPreemptionsPerHost = new 
HashMap<>();
-
-  private final NodeBlacklistConf nodeBlacklistConf;
-
-  // Per daemon
-  private final int memoryPerInstance;
-  private final int coresPerInstance;
-  private final int executorsPerInstance;
-
-  private final int numSchedulableTasksPerNode;
-
-  // Per Executor Thread
-  private final Resource resourcePerExecutor;
-
-  private final LlapRegistryService registry = new LlapRegistryService(false);
-
-  private volatile ListenableFuture<Void> nodeEnablerFuture;
-  private volatile ListenableFuture<Void> schedulerFuture;
-
-  @VisibleForTesting
-  private final AtomicInteger dagCounter = new AtomicInteger(1);
-  // Statistics to track allocations
-  // All of stats variables are visible for testing.
-  @VisibleForTesting
-  StatsPerDag dagStats = new StatsPerDag();
-
-  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
-    this(taskSchedulerContext, new SystemClock());
-  }
-
-  @VisibleForTesting
-  public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, 
Clock clock) {
-    super(taskSchedulerContext);
-    this.clock = clock;
-    try {
-      this.conf = 
TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
-    } catch (IOException e) {
-      throw new TezUncheckedException(
-          "Failed to parse user payload for " + 
LlapTaskSchedulerService.class.getSimpleName(), e);
-    }
-    this.containerFactory = new 
ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
-        taskSchedulerContext.getCustomClusterIdentifier());
-    this.memoryPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
-    this.coresPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
-    this.executorsPerInstance = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
-    this.nodeBlacklistConf = new NodeBlacklistConf(
-        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS,
-            TimeUnit.MILLISECONDS),
-        HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS,
-            TimeUnit.MILLISECONDS),
-        HiveConf.getFloatVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR));
-
-    this.numSchedulableTasksPerNode =
-        HiveConf.getIntVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
-
-    long localityDelayMs = HiveConf
-        .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, 
TimeUnit.MILLISECONDS);
-    if (localityDelayMs == -1) {
-      this.forceLocation = true;
-    } else {
-      this.forceLocation = false;
-    }
-
-    int memoryPerExecutor = (int) (memoryPerInstance / (float) 
executorsPerInstance);
-    int coresPerExecutor = (int) (coresPerInstance / (float) 
executorsPerInstance);
-    this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 
coresPerExecutor);
-
-    String instanceId = HiveConf.getTrimmedVar(conf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-
-    Preconditions.checkNotNull(instanceId, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname
-        + " must be defined");
-
-    ExecutorService executorServiceRaw =
-        Executors.newFixedThreadPool(1,
-            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
-    nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
-
-    ExecutorService schedulerExecutorServiceRaw = 
Executors.newFixedThreadPool(1,
-        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
-    schedulerExecutor = 
MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
-
-    LOG.info("Running with configuration: " + "memoryPerInstance=" + 
memoryPerInstance
-        + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
-        + executorsPerInstance + ", resourcePerInstanceInferred=" + 
resourcePerExecutor
-        + ", nodeBlacklistConf=" + nodeBlacklistConf
-        + ", forceLocation=" + forceLocation);
-  }
-
-  @Override
-  public void initialize() {
-    registry.init(conf);
-  }
-
-  @Override
-  public void start() throws IOException {
-    writeLock.lock();
-    try {
-      nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
-      Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("NodeEnabledThread exited");
-        }
-
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("NodeEnabledThread exited with error", t);
-        }
-      });
-      schedulerFuture = schedulerExecutor.submit(schedulerCallable);
-      Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("SchedulerThread exited");
-        }
-
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("SchedulerThread exited with error", t);
-        }
-      });
-      registry.start();
-      activeInstances = registry.getInstances();
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
-        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    writeLock.lock();
-    try {
-      if (!this.isStopped.getAndSet(true)) {
-        nodeEnablerCallable.shutdown();
-        if (nodeEnablerFuture != null) {
-          nodeEnablerFuture.cancel(true);
-        }
-        nodeEnabledExecutor.shutdownNow();
-
-        schedulerCallable.shutdown();
-        if (schedulerFuture != null) {
-          schedulerFuture.cancel(true);
-        }
-        schedulerExecutor.shutdownNow();
-
-        if (registry != null) {
-          registry.stop();
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public Resource getTotalResources() {
-    int memory = 0;
-    int vcores = 0;
-    readLock.lock();
-    try {
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
-        if (inst.isAlive()) {
-          Resource r = inst.getResource();
-          LOG.info("Found instance " + inst);
-          memory += r.getMemory();
-          vcores += r.getVirtualCores();
-        } else {
-          LOG.info("Ignoring dead instance " + inst);
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-    return Resource.newInstance(memory, vcores);
-  }
-
-  /**
-   * The difference between this and getTotalResources() is that this only 
gives currently free
-   * resource instances, while the other lists all the instances that may 
become available in a
-   * while.
-   */
-  @Override
-  public Resource getAvailableResources() {
-    // need a state store eventually for current state & measure backoffs
-    int memory = 0;
-    int vcores = 0;
-    readLock.lock();
-    try {
-      for (Entry<ServiceInstance, NodeInfo> entry : 
instanceToNodeMap.entrySet()) {
-        if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) {
-          Resource r = entry.getKey().getResource();
-          memory += r.getMemory();
-          vcores += r.getVirtualCores();
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-    return Resource.newInstance(memory, vcores);
-  }
-
-  @Override
-  public int getClusterNodeCount() {
-    readLock.lock();
-    try {
-      int n = 0;
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
-        if (inst.isAlive()) {
-          n++;
-        }
-      }
-      return n;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
-  public void dagComplete() {
-    // This is effectively DAG completed, and can be used to reset statistics 
being tracked.
-    LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + 
dagStats);
-    dagCounter.incrementAndGet();
-    dagStats = new StatsPerDag();
-  }
-
-  @Override
-  public void blacklistNode(NodeId nodeId) {
-    LOG.info("BlacklistNode not supported");
-  }
-
-  @Override
-  public void unblacklistNode(NodeId nodeId) {
-    LOG.info("unBlacklistNode not supported");
-  }
-
-  @Override
-  public void allocateTask(Object task, Resource capability, String[] hosts, 
String[] racks,
-      Priority priority, Object containerSignature, Object clientCookie) {
-    TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, hosts, racks, 
clock.getTime());
-    writeLock.lock();
-    try {
-      dagStats.registerTaskRequest(hosts, racks);
-    } finally {
-      writeLock.unlock();
-    }
-    addPendingTask(taskInfo);
-    trySchedulingPendingTasks();
-  }
-
-  @Override
-  public void allocateTask(Object task, Resource capability, ContainerId 
containerId,
-      Priority priority, Object containerSignature, Object clientCookie) {
-    // Container affinity can be implemented as Host affinity for LLAP. Not 
required until
-    // 1:1 edges are used in Hive.
-    TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, null, null, 
clock.getTime());
-    writeLock.lock();
-    try {
-      dagStats.registerTaskRequest(null, null);
-    } finally {
-      writeLock.unlock();
-    }
-    addPendingTask(taskInfo);
-    trySchedulingPendingTasks();
-  }
-
-
-  // This may be invoked before a container is ever assigned to a task. 
allocateTask... app decides
-  // the task is no longer required, and asks for a de-allocation.
-  @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded, 
TaskAttemptEndReason endReason, String diagnostics) {
-    writeLock.lock(); // Updating several local structures
-    TaskInfo taskInfo;
-    try {
-      taskInfo = unregisterTask(task);
-      if (taskInfo == null) {
-        LOG.error("Could not determine ContainerId for task: "
-            + task
-            + " . Could have hit a race condition. Ignoring."
-            + " The query may hang since this \"unknown\" container is now 
taking up a slot permanently");
-        return false;
-      }
-      if (taskInfo.containerId == null) {
-        if (taskInfo.assigned) {
-          LOG.error("Task: "
-              + task
-              + " assigned, but could not find the corresponding containerId."
-              + " The query may hang since this \"unknown\" container is now 
taking up a slot permanently");
-        } else {
-          LOG.info("Ignoring deallocate request for task " + task
-              + " which hasn't been assigned to a container");
-          removePendingTask(taskInfo);
-        }
-        return false;
-      }
-      ServiceInstance assignedInstance = taskInfo.assignedInstance;
-      assert assignedInstance != null;
-
-      NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance);
-      assert nodeInfo != null;
-
-      // Re-enable the node if preempted
-      if (taskInfo.preempted) {
-        LOG.info("Processing deallocateTask for {} which was preempted, 
EndReason={}", task, endReason);
-        unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
-        nodeInfo.registerUnsuccessfulTaskEnd(true);
-        if (nodeInfo.isDisabled()) {
-          // Re-enable the node. If a task succeeded, a slot may have become 
available.
-          // Also reset commFailures since a task was able to communicate back 
and indicate success.
-          nodeInfo.enableNode();
-          // Re-insert into the queue to force the poll thread to remove the 
element.
-          if ( disabledNodesQueue.remove(nodeInfo)) {
-            disabledNodesQueue.add(nodeInfo);
-          }
-        }
-        // In case of success, trigger a scheduling run for pending tasks.
-        trySchedulingPendingTasks();
-      } else {
-        if (taskSucceeded) {
-          // The node may have been blacklisted at this point - which means it 
may not be in the
-          // activeNodeList.
-
-          nodeInfo.registerTaskSuccess();
-
-          if (nodeInfo.isDisabled()) {
-            // Re-enable the node. If a task succeeded, a slot may have become 
available.
-            // Also reset commFailures since a task was able to communicate 
back and indicate success.
-            nodeInfo.enableNode();
-            // Re-insert into the queue to force the poll thread to remove the 
element.
-            if (disabledNodesQueue.remove(nodeInfo)) {
-              disabledNodesQueue.add(nodeInfo);
-            }
-          }
-          // In case of success, trigger a scheduling run for pending tasks.
-          trySchedulingPendingTasks();
-
-        } else if (!taskSucceeded) {
-          nodeInfo.registerUnsuccessfulTaskEnd(false);
-          if (endReason != null && EnumSet
-              .of(TaskAttemptEndReason.EXECUTOR_BUSY, 
TaskAttemptEndReason.COMMUNICATION_ERROR)
-              .contains(endReason)) {
-            if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
-              
dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
-            } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) {
-              
dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
-            }
-          }
-          boolean commFailure =
-              endReason != null && endReason == 
TaskAttemptEndReason.COMMUNICATION_ERROR;
-          disableInstance(assignedInstance, commFailure);
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-    getContext().containerBeingReleased(taskInfo.containerId);
-    return true;
-  }
-
-  @Override
-  public Object deallocateContainer(ContainerId containerId) {
-    LOG.debug("Ignoring deallocateContainer for containerId: " + containerId);
-    // Containers are not being tracked for re-use.
-    // This is safe to ignore since a deallocate task will come in.
-    return null;
-  }
-
-  @Override
-  public void setShouldUnregister() {
-
-  }
-
-  @Override
-  public boolean hasUnregistered() {
-    // Nothing to do. No registration involved.
-    return true;
-  }
-
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-
-  /**
-   * @param request the list of preferred hosts. null implies any host
-   * @return
-   */
-  private SelectHostResult selectHost(TaskInfo request) {
-    String[] requestedHosts = request.requestedHosts;
-    readLock.lock(); // Read-lock. Not updating any stats at the moment.
-    try {
-      // Check if any hosts are active.
-      if (getAvailableResources().getMemory() <= 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Refreshing instances since total memory is 0");
-        }
-        refreshInstances();
-      }
-
-      // If there's no memory available, fail
-      if (getTotalResources().getMemory() <= 0) {
-        return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
-      }
-
-      if (requestedHosts != null && requestedHosts.length > 0) {
-        int prefHostCount = -1;
-        boolean requestedHostExists = false;
-        for (String host : requestedHosts) {
-          prefHostCount++;
-          // Pick the first host always. Weak attempt at cache affinity.
-          Set<ServiceInstance> instances = activeInstances.getByHost(host);
-          if (!instances.isEmpty()) {
-            requestedHostExists = true;
-            for (ServiceInstance inst : instances) {
-              NodeInfo nodeInfo = instanceToNodeMap.get(inst);
-              if (nodeInfo != null && nodeInfo.canAcceptTask()) {
-                LOG.info("Assigning " + inst + " when looking for " + host + 
"." +
-                    " FirstRequestedHost=" + (prefHostCount == 0) +
-                    (requestedHosts.length > 1 ? "#prefLocations=" + 
requestedHosts.length : ""));
-                return new SelectHostResult(inst, nodeInfo);
-              }
-            }
-          }
-        }
-        // Check if forcing the location is required.
-        if (forceLocation) {
-          if (requestedHostExists) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skipping non-local location allocation for [" + 
request.task +
-                  "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) + "]");
-            }
-            return SELECT_HOST_RESULT_DELAYED_LOCALITY;
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Not skipping non-local location allocation for [" + 
request.task +
-                  "] when trying to allocate on [" + 
Arrays.toString(requestedHosts) +
-                  "] since none of these hosts are part of the known list");
-            }
-          }
-        }
-      }
-      /* fall through - miss in locality (random scheduling) */
-      Entry<ServiceInstance, NodeInfo>[] all =
-          instanceToNodeMap.entrySet().toArray(new 
Entry[instanceToNodeMap.size()]);
-      // Check again
-      if (all.length > 0) {
-        int n = random.nextInt(all.length);
-        // start at random offset and iterate whole list
-        for (int i = 0; i < all.length; i++) {
-          Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
-          if (inst.getValue().canAcceptTask()) {
-            LOG.info("Assigning " + inst + " when looking for any host, from 
#hosts=" + all.length +
-                ", requestedHosts=" +
-                ((requestedHosts == null || requestedHosts.length == 0) ? 
"null" :
-                    Arrays.toString(requestedHosts)));
-            return new SelectHostResult(inst.getKey(), inst.getValue());
-          }
-        }
-      }
-      return SELECT_HOST_RESULT_DELAYED_RESOURCES;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  // TODO Each refresh operation should addNodes if they don't already exist.
-  // Even better would be to get notifications from the service impl when a 
node gets added or removed.
-  // Instead of having to walk through the entire list. The computation of a 
node getting added or
-  // removed already exists in the DynamicRegistry implementation.
-  private void refreshInstances() {
-    try {
-      activeInstances.refresh(); // handles its own sync
-    } catch (IOException ioe) {
-      LOG.warn("Could not refresh list of active instances", ioe);
-    }
-  }
-
-  private void scanForNodeChanges() {
-    /* check again whether nodes are disabled or just missing */
-    writeLock.lock();
-    try {
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
-        if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) {
-          /* that's a good node, not added to the allocations yet */
-          LOG.info("Found a new node: " + inst + ".");
-          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, 
numSchedulableTasksPerNode));
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void addNode(ServiceInstance inst, NodeInfo node) {
-    LOG.info("Adding node: " + inst);
-    instanceToNodeMap.put(inst, node);
-    // Trigger scheduling since a new node became available.
-    trySchedulingPendingTasks();
-  }
-
-  private void reenableDisabledNode(NodeInfo nodeInfo) {
-    writeLock.lock();
-    try {
-      if (nodeInfo.hadCommFailure()) {
-        // If the node being re-enabled was not marked busy previously, then 
it was disabled due to
-        // some other failure. Refresh the service list to see if it's been 
removed permanently.
-        refreshInstances();
-      }
-      LOG.info("Attempting to re-enable node: " + 
nodeInfo.getServiceInstance());
-      if (nodeInfo.getServiceInstance().isAlive()) {
-        nodeInfo.enableNode();
-      } else {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Removing dead node " + nodeInfo);
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void disableInstance(ServiceInstance instance, boolean 
isCommFailure) {
-    writeLock.lock();
-    try {
-      NodeInfo nodeInfo = instanceToNodeMap.get(instance);
-      if (nodeInfo == null || nodeInfo.isDisabled()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Node: " + instance + " already disabled, or invalid. Not 
doing anything.");
-        }
-      } else {
-        nodeInfo.disableNode(isCommFailure);
-        // TODO: handle task to container map events in case of hard failures
-        disabledNodesQueue.add(nodeInfo);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void addPendingTask(TaskInfo taskInfo) {
-    writeLock.lock();
-    try {
-      List<TaskInfo> tasksAtPriority = pendingTasks.get(taskInfo.priority);
-      if (tasksAtPriority == null) {
-        tasksAtPriority = new LinkedList<>();
-        pendingTasks.put(taskInfo.priority, tasksAtPriority);
-      }
-      tasksAtPriority.add(taskInfo);
-      knownTasks.putIfAbsent(taskInfo.task, taskInfo);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /* Remove a task from the pending list */
-  private void removePendingTask(TaskInfo taskInfo) {
-    writeLock.lock();
-    try {
-      Priority priority = taskInfo.priority;
-      List<TaskInfo> taskInfoList = pendingTasks.get(priority);
-      if (taskInfoList == null || taskInfoList.isEmpty() || 
!taskInfoList.remove(taskInfo)) {
-        LOG.warn("Could not find task: " + taskInfo.task + " in pending list, 
at priority: "
-            + priority);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /* Register a running task into the runningTasks structure */
-  private void registerRunningTask(TaskInfo taskInfo) {
-    writeLock.lock();
-    try {
-      int priority = taskInfo.priority.getPriority();
-      TreeSet<TaskInfo> tasksAtpriority = runningTasks.get(priority);
-      if (tasksAtpriority == null) {
-        tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR);
-        runningTasks.put(priority, tasksAtpriority);
-      }
-      tasksAtpriority.add(taskInfo);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /* Unregister a task from the known and running structures */
-  private TaskInfo unregisterTask(Object task) {
-    writeLock.lock();
-    try {
-      TaskInfo taskInfo = knownTasks.remove(task);
-      if (taskInfo != null) {
-        if (taskInfo.assigned) {
-          // Remove from the running list.
-          int priority = taskInfo.priority.getPriority();
-          Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
-          Preconditions.checkState(tasksAtPriority != null,
-              "runningTasks should contain an entry if the task was in running 
state. Caused by task: {}", task);
-          tasksAtPriority.remove(taskInfo);
-          if (tasksAtPriority.isEmpty()) {
-            runningTasks.remove(priority);
-          }
-        }
-      } else {
-        LOG.warn("Could not find TaskInfo for task: {}. Not removing it from 
the running set", task);
-      }
-      return taskInfo;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private enum ScheduleResult {
-    // Successfully scheduled
-    SCHEDULED,
-
-    // Delayed to find a local match
-    DELAYED_LOCALITY,
-
-    // Delayed due to temporary resource availability
-    DELAYED_RESOURCES,
-
-    // Inadequate total resources - will never succeed / wait for new 
executors to become available
-    INADEQUATE_TOTAL_RESOURCES,
-  }
-
-  @VisibleForTesting
-  protected void schedulePendingTasks() {
-    writeLock.lock();
-    try {
-      Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
-          pendingTasks.entrySet().iterator();
-      while (pendingIterator.hasNext()) {
-        Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
-        List<TaskInfo> taskListAtPriority = entry.getValue();
-        Iterator<TaskInfo> taskIter = taskListAtPriority.iterator();
-        boolean scheduledAllAtPriority = true;
-        while (taskIter.hasNext()) {
-
-          // TODO Optimization: Add a check to see if there's any capacity 
available. No point in
-          // walking through all active nodes, if they don't have potential 
capacity.
-
-          TaskInfo taskInfo = taskIter.next();
-          if (taskInfo.getNumPreviousAssignAttempts() == 1) {
-            dagStats.registerDelayedAllocation();
-          }
-          taskInfo.triedAssigningTask();
-          ScheduleResult scheduleResult = scheduleTask(taskInfo);
-          if (scheduleResult == ScheduleResult.SCHEDULED) {
-            taskIter.remove();
-          } else {
-            // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by 
throwin an error immediately,
-            // or waiting for some timeout for new executors and then throwing 
an error
-
-            // Try pre-empting a task so that a higher priority task can take 
it's place.
-            // Preempt only if there's no pending preemptions to avoid 
preempting twice for a task.
-            String[] potentialHosts;
-            if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
-              // preempt only on specific hosts, if no preemptions already 
exist on those.
-              potentialHosts = taskInfo.requestedHosts;
-              //Protect against a bad location being requested.
-              if (potentialHosts == null || potentialHosts.length == 0) {
-                potentialHosts = null;
-              }
-            } else {
-              // preempt on any host.
-              potentialHosts = null;
-            }
-
-            if (potentialHosts != null) {
-              // Preempt on specific host
-              boolean shouldPreempt = true;
-              for (String host : potentialHosts) {
-                // Preempt only if there are not pending preemptions on the 
same host
-                // When the premption registers, the request at the highest 
priority will be given the slot,
-                // even if the initial request was for some other task.
-                // TODO Maybe register which task the preemption was for, to 
avoid a bad non-local allocation.
-                MutableInt pendingHostPreemptions = 
pendingPreemptionsPerHost.get(host);
-                if (pendingHostPreemptions != null && 
pendingHostPreemptions.intValue() > 0) {
-                  shouldPreempt = false;
-                  break;
-                }
-              }
-              if (shouldPreempt) {
-                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on hosts={}",
-                    taskInfo.task, pendingPreemptions.get(), 
Arrays.toString(potentialHosts));
-                preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
-              }
-            } else {
-              // Request for a preemption if there's none pending. If a single 
preemption is pending,
-              // and this is the next task to be assigned, it will be assigned 
once that slot becomes available.
-              if (pendingPreemptions.get() == 0) {
-                LOG.info("Attempting to preempt for {}, pendingPreemptions={} 
on any host",
-                    taskInfo.task, pendingPreemptions.get());
-                preemptTasks(entry.getKey().getPriority(), 1, null);
-              }
-            }
-            // Since there was an allocation failure - don't try assigning 
tasks at the next priority.
-
-            scheduledAllAtPriority = false;
-            // Don't break if this allocation failure was a result of a 
LOCALITY_DELAY. Others could still be allocated.
-            if (scheduleResult != ScheduleResult.DELAYED_LOCALITY) {
-              break;
-            }
-          } // end of else - i.e. could not allocate
-        } // end of loop over pending tasks
-        if (taskListAtPriority.isEmpty()) {
-          // Remove the entry, if there's nothing left at the specific 
priority level
-          pendingIterator.remove();
-        }
-        if (!scheduledAllAtPriority) {
-          // Don't attempt scheduling for additional priorities
-          break;
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-
-  private ScheduleResult scheduleTask(TaskInfo taskInfo) {
-    SelectHostResult selectHostResult = selectHost(taskInfo);
-    if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
-      NodeServiceInstancePair nsPair = 
selectHostResult.nodeServiceInstancePair;
-      Container container =
-          containerFactory.createContainer(resourcePerExecutor, 
taskInfo.priority,
-              nsPair.getServiceInstance().getHost(),
-              nsPair.getServiceInstance().getRpcPort());
-      writeLock.lock(); // While updating local structures
-      try {
-        LOG.info("Assigned task {} to container {}", taskInfo, 
container.getId());
-        dagStats.registerTaskAllocated(taskInfo.requestedHosts, 
taskInfo.requestedRacks,
-            nsPair.getServiceInstance().getHost());
-        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), 
container.getId(), clock.getTime());
-        registerRunningTask(taskInfo);
-        nsPair.getNodeInfo().registerTaskScheduled();
-      } finally {
-        writeLock.unlock();
-      }
-      getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, 
container);
-    }
-    return selectHostResult.scheduleResult;
-  }
-
-  // Removes tasks from the runningList and sends out a preempt request to the 
system.
-  // Subsequent tasks will be scheduled again once the de-allocate request for 
the preempted
-  // task is processed.
-  private void preemptTasks(int forPriority, int numTasksToPreempt, String 
[]potentialHosts) {
-    Set<String> preemptHosts;
-    if (potentialHosts == null) {
-      preemptHosts = null;
-    } else {
-      preemptHosts = Sets.newHashSet(potentialHosts);
-    }
-    writeLock.lock();
-    List<TaskInfo> preemptedTaskList = null;
-    try {
-      NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = 
runningTasks.descendingMap();
-      Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = 
orderedMap.entrySet().iterator();
-      int preemptedCount = 0;
-      while (iterator.hasNext() && preemptedCount < numTasksToPreempt) {
-        Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next();
-        if (entryAtPriority.getKey() > forPriority) {
-          Iterator<TaskInfo> taskInfoIterator = 
entryAtPriority.getValue().iterator();
-          while (taskInfoIterator.hasNext() && preemptedCount < 
numTasksToPreempt) {
-            TaskInfo taskInfo = taskInfoIterator.next();
-            if (preemptHosts == null || 
preemptHosts.contains(taskInfo.assignedInstance.getHost())) {
-              // Candidate for preemption.
-              preemptedCount++;
-              LOG.info("preempting {} for task at priority {} with 
potentialHosts={}", taskInfo,
-                  forPriority, potentialHosts == null ? "" : 
Arrays.toString(potentialHosts));
-              taskInfo.setPreemptedInfo(clock.getTime());
-              if (preemptedTaskList == null) {
-                preemptedTaskList = new LinkedList<>();
-              }
-              
dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
-              preemptedTaskList.add(taskInfo);
-              registerPendingPreemption(taskInfo.assignedInstance.getHost());
-              // Remove from the runningTaskList
-              taskInfoIterator.remove();
-            }
-          }
-
-          // Remove entire priority level if it's been emptied.
-          if (entryAtPriority.getValue().isEmpty()) {
-            iterator.remove();
-          }
-        } else {
-          // No tasks qualify as preemptable
-          LOG.info("DBG: No tasks qualify as killable to schedule tasks at 
priority {}", forPriority);
-          break;
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-    // Send out the preempted request outside of the lock.
-    if (preemptedTaskList != null) {
-      for (TaskInfo taskInfo : preemptedTaskList) {
-        LOG.info("DBG: Preempting task {}", taskInfo);
-        getContext().preemptContainer(taskInfo.containerId);
-        // Preemption will finally be registered as a deallocateTask as a 
result of preemptContainer
-        // That resets preemption info and allows additional tasks to be 
pre-empted if required.
-      }
-    }
-    // The schedule loop will be triggered again when the deallocateTask 
request comes in for the
-    // preempted task.
-  }
-
-  private void registerPendingPreemption(String host) {
-    writeLock.lock();
-    try {
-      pendingPreemptions.incrementAndGet();
-      MutableInt val = pendingPreemptionsPerHost.get(host);
-      if (val == null) {
-        val = new MutableInt(1);
-        pendingPreemptionsPerHost.put(host, val);
-      }
-      val.increment();
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void unregisterPendingPreemption(String host) {
-    writeLock.lock();
-    try {
-      pendingPreemptions.decrementAndGet();
-      MutableInt val = pendingPreemptionsPerHost.get(host);
-      Preconditions.checkNotNull(val);
-      val.decrement();
-      // Not bothering with removing the entry. There's a limited number of 
hosts, and a good
-      // chance that the entry will make it back in when the AM is used for a 
long duration.
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private class NodeEnablerCallable implements Callable<Void> {
-
-    private AtomicBoolean isShutdown = new AtomicBoolean(false);
-    private static final long REFRESH_INTERVAL = 10000l;
-    long nextPollInterval = REFRESH_INTERVAL;
-    long lastRefreshTime;
-
-    @Override
-    public Void call() {
-
-      lastRefreshTime = System.currentTimeMillis();
-      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
-        try {
-          while (true) {
-            NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, 
TimeUnit.MILLISECONDS);
-            if (nodeInfo != null) {
-              long currentTime = System.currentTimeMillis();
-              // A node became available. Enable the node and try scheduling.
-              reenableDisabledNode(nodeInfo);
-              trySchedulingPendingTasks();
-
-              nextPollInterval -= (currentTime - lastRefreshTime);
-            }
-
-            if (nextPollInterval < 0 || nodeInfo == null) {
-              // timeout expired. Reset the poll interval and refresh nodes.
-              nextPollInterval = REFRESH_INTERVAL;
-              lastRefreshTime = System.currentTimeMillis();
-              // TODO Get rid of this polling once we have notificaitons from 
the registry sub-system
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Refreshing instances based on poll interval");
-              }
-              refreshInstances();
-              scanForNodeChanges();
-            }
-          }
-        } catch (InterruptedException e) {
-          if (isShutdown.get()) {
-            LOG.info("NodeEnabler thread interrupted after shutdown");
-            break;
-          } else {
-            LOG.warn("NodeEnabler thread interrupted without being shutdown");
-            throw new RuntimeException("NodeEnabler thread interrupted without 
being shutdown", e);
-          }
-        }
-      }
-      return null;
-    }
-
-    // Call this first, then send in an interrupt to the thread.
-    public void shutdown() {
-      isShutdown.set(true);
-    }
-  }
-
-  private void trySchedulingPendingTasks() {
-    scheduleLock.lock();
-    try {
-      pendingScheduleInvodations.set(true);
-      scheduleCondition.signal();
-    } finally {
-      scheduleLock.unlock();
-    }
-  }
-
-  private class SchedulerCallable implements Callable<Void> {
-    private AtomicBoolean isShutdown = new AtomicBoolean(false);
-
-    @Override
-    public Void call() {
-      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
-        scheduleLock.lock();
-        try {
-          while (!pendingScheduleInvodations.get()) {
-            scheduleCondition.await();
-          }
-        } catch (InterruptedException e) {
-          if (isShutdown.get()) {
-            LOG.info("Scheduler thread interrupted after shutdown");
-            break;
-          } else {
-            LOG.warn("Scheduler thread interrupted without being shutdown");
-            throw new RuntimeException("Scheduler thread interrupted without 
being shutdown", e);
-          }
-        } finally {
-          scheduleLock.unlock();
-        }
-
-        // Set pending to false since scheduling is about to run. Any triggers 
up to this point
-        // will be handled in the next run.
-        // A new request may come in right after this is set to false, but 
before the actual scheduling.
-        // This will be handled in this run, but will cause an immediate run 
after, which is harmless.
-        pendingScheduleInvodations.set(false);
-        // Schedule outside of the scheduleLock - which should only be used to 
wait on the condition.
-        schedulePendingTasks();
-      }
-      return null;
-    }
-
-    // Call this first, then send in an interrupt to the thread.
-    public void shutdown() {
-      isShutdown.set(true);
-    }
-  }
-
-  @VisibleForTesting
-  static class NodeInfo implements Delayed {
-    private final NodeBlacklistConf blacklistConf;
-    final ServiceInstance serviceInstance;
-    private final Clock clock;
-
-    long expireTimeMillis = -1;
-    private long numSuccessfulTasks = 0;
-    private long numSuccessfulTasksAtLastBlacklist = -1;
-    float cumulativeBackoffFactor = 1.0f;
-
-    // Indicates whether a node had a recent communication failure.
-    private boolean hadCommFailure = false;
-
-    // Indicates whether a node is disabled - for whatever reason - 
commFailure, busy, etc.
-    private boolean disabled = false;
-
-    private int numPreemptedTasks = 0;
-    private int numScheduledTasks = 0;
-    private final int numSchedulableTasks;
-
-
-    /**
-     * Create a NodeInfo bound to a service instance
-     *
-     * @param serviceInstance         the associated serviceInstance
-     * @param blacklistConf           blacklist configuration
-     * @param clock                   clock to use to obtain timing information
-     * @param numSchedulableTasksConf number of schedulable tasks on the node. 
0 represents auto
-     *                                detect based on the serviceInstance, -1 
indicates indicates
-     *                                unlimited capacity
-     */
-    NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, 
Clock clock, int numSchedulableTasksConf) {
-      Preconditions.checkArgument(numSchedulableTasksConf >= -1, 
"NumSchedulableTasks must be >=-1");
-      this.serviceInstance = serviceInstance;
-      this.blacklistConf = blacklistConf;
-      this.clock = clock;
-
-      if (numSchedulableTasksConf == 0) {
-        int pendingQueueuCapacity = 0;
-        String pendingQueueCapacityString = serviceInstance.getProperties()
-            .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting up node: " + serviceInstance + ", with available 
capacity=" +
-              serviceInstance.getResource().getVirtualCores() + ", 
pendingQueueCapacity=" +
-              pendingQueueCapacityString);
-        }
-
-        if (pendingQueueCapacityString != null) {
-          pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
-        }
-        this.numSchedulableTasks = 
serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity;
-      } else {
-        this.numSchedulableTasks = numSchedulableTasksConf;
-      }
-      LOG.info("Setting up node: " + serviceInstance + " with 
schedulableCapacity=" + this.numSchedulableTasks);
-    }
-
-    ServiceInstance getServiceInstance() {
-      return serviceInstance;
-    }
-
-    void enableNode() {
-      expireTimeMillis = -1;
-      disabled = false;
-      hadCommFailure = false;
-    }
-
-    void disableNode(boolean commFailure) {
-      long duration = blacklistConf.minDelay;
-      long currentTime = clock.getTime();
-      this.hadCommFailure = commFailure;
-      disabled = true;
-      if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) {
-        // Relying on a task succeeding to reset the exponent.
-        // There's no notifications on whether a task gets accepted or not. 
That would be ideal to
-        // reset this.
-        cumulativeBackoffFactor = cumulativeBackoffFactor * 
blacklistConf.backoffFactor;
-      } else {
-        // Was able to execute something before the last blacklist. Reset the 
exponent.
-        cumulativeBackoffFactor = 1.0f;
-      }
-
-      long delayTime = (long) (duration * cumulativeBackoffFactor);
-      if (delayTime > blacklistConf.maxDelay) {
-        delayTime = blacklistConf.maxDelay;
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Disabling instance " + serviceInstance + " for " + delayTime 
+ " milli-seconds");
-      }
-      expireTimeMillis = currentTime + delayTime;
-      numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks;
-    }
-
-    void registerTaskScheduled() {
-      numScheduledTasks++;
-    }
-
-    void registerTaskSuccess() {
-      numSuccessfulTasks++;
-      numScheduledTasks--;
-    }
-
-    void registerUnsuccessfulTaskEnd(boolean wasPreempted) {
-      numScheduledTasks--;
-      if (wasPreempted) {
-        numPreemptedTasks++;
-      }
-    }
-
-    public boolean isDisabled() {
-      return disabled;
-    }
-
-    public boolean hadCommFailure() {
-      return hadCommFailure;
-    }
-    /* Returning true does not guarantee that the task will run, considering 
other queries
-    may be running in the system. Also depends upon the capacity usage 
configuration
-     */
-    public boolean canAcceptTask() {
-      boolean result = !hadCommFailure && !disabled && 
serviceInstance.isAlive()
-          &&(numSchedulableTasks == -1 || ((numSchedulableTasks - 
numScheduledTasks) > 0));
-      LOG.info("canAcceptTask={}, numScheduledTasks={}, 
numSchedulableTasks={}, hadCommFailure={}, disabled={}, 
serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, 
hadCommFailure, disabled, serviceInstance.isAlive());
-      return result;
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-      return unit.convert(expireTimeMillis - clock.getTime(), 
TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed o) {
-      NodeInfo other = (NodeInfo) o;
-      if (other.expireTimeMillis > this.expireTimeMillis) {
-        return -1;
-      } else if (other.expireTimeMillis < this.expireTimeMillis) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "NodeInfo{" + "instance=" + serviceInstance
-          + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" 
+ numSuccessfulTasks
-          + ", numSuccessfulTasksAtLastBlacklist=" + 
numSuccessfulTasksAtLastBlacklist
-          + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor
-          + ", numSchedulableTasks=" + numSchedulableTasks
-          + ", numScheduledTasks=" + numScheduledTasks
-          + ", disabled=" + disabled
-          + ", commFailures=" + hadCommFailure
-          +'}';
-    }
-  }
-
-  @VisibleForTesting
-  static class StatsPerDag {
-    int numRequestedAllocations = 0;
-    int numRequestsWithLocation = 0;
-    int numRequestsWithoutLocation = 0;
-    int numTotalAllocations = 0;
-    int numLocalAllocations = 0;
-    int numNonLocalAllocations = 0;
-    int numAllocationsNoLocalityRequest = 0;
-    int numRejectedTasks = 0;
-    int numCommFailures = 0;
-    int numDelayedAllocations = 0;
-    int numPreemptedTasks = 0;
-    Map<String, AtomicInteger> localityBasedNumAllocationsPerHost = new 
HashMap<>();
-    Map<String, AtomicInteger> numAllocationsPerHost = new HashMap<>();
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("NumPreemptedTasks=").append(numPreemptedTasks).append(", ");
-      
sb.append("NumRequestedAllocations=").append(numRequestedAllocations).append(", 
");
-      
sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", 
");
-      
sb.append("NumLocalAllocations=").append(numLocalAllocations).append(",");
-      
sb.append("NumNonLocalAllocations=").append(numNonLocalAllocations).append(",");
-      
sb.append("NumTotalAllocations=").append(numTotalAllocations).append(",");
-      
sb.append("NumRequestsWithoutLocation=").append(numRequestsWithoutLocation).append(",
 ");
-      sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", ");
-      sb.append("NumCommFailures=").append(numCommFailures).append(", ");
-      
sb.append("NumDelayedAllocations=").append(numDelayedAllocations).append(", ");
-      
sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost)
-          .append(", ");
-      sb.append("NumAllocationsPerHost=").append(numAllocationsPerHost);
-      return sb.toString();
-    }
-
-    void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) 
{
-      numRequestedAllocations++;
-      // TODO Change after HIVE-9987. For now, there's no rack matching.
-      if (requestedHosts != null && requestedHosts.length != 0) {
-        numRequestsWithLocation++;
-      } else {
-        numRequestsWithoutLocation++;
-      }
-    }
-
-    void registerTaskAllocated(String[] requestedHosts, String[] 
requestedRacks,
-        String allocatedHost) {
-      // TODO Change after HIVE-9987. For now, there's no rack matching.
-      if (requestedHosts != null && requestedHosts.length != 0) {
-        Set<String> requestedHostSet = new 
HashSet<>(Arrays.asList(requestedHosts));
-        if (requestedHostSet.contains(allocatedHost)) {
-          numLocalAllocations++;
-          _registerAllocationInHostMap(allocatedHost, 
localityBasedNumAllocationsPerHost);
-        } else {
-          numNonLocalAllocations++;
-        }
-      } else {
-        numAllocationsNoLocalityRequest++;
-      }
-      numTotalAllocations++;
-      _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
-    }
-
-    void registerTaskPreempted(String host) {
-      numPreemptedTasks++;
-    }
-
-    void registerCommFailure(String host) {
-      numCommFailures++;
-    }
-
-    void registerTaskRejected(String host) {
-      numRejectedTasks++;
-    }
-
-    void registerDelayedAllocation() {
-      numDelayedAllocations++;
-    }
-
-    private void _registerAllocationInHostMap(String host, Map<String, 
AtomicInteger> hostMap) {
-      AtomicInteger val = hostMap.get(host);
-      if (val == null) {
-        val = new AtomicInteger(0);
-        hostMap.put(host, val);
-      }
-      val.incrementAndGet();
-    }
-  }
-
-  private static class TaskInfo {
-    // IDs used to ensure two TaskInfos are different without using the 
underlying task instance.
-    // Required for insertion into a TreeMap
-    static final AtomicLong ID_GEN = new AtomicLong(0);
-    final long uniqueId;
-    final Object task;
-    final Object clientCookie;
-    final Priority priority;
-    final Resource capability;
-    final String[] requestedHosts;
-    final String[] requestedRacks;
-    final long requestTime;
-    long startTime;
-    long preemptTime;
-    ContainerId containerId;
-    ServiceInstance assignedInstance;
-    private boolean assigned = false;
-    private boolean preempted = false;
-
-    private int numAssignAttempts = 0;
-
-    // TaskInfo instances for two different tasks will not be the same. Only a 
single instance should
-    // ever be created for a taskAttempt
-    public TaskInfo(Object task, Object clientCookie, Priority priority, 
Resource capability,
-        String[] hosts, String[] racks, long requestTime) {
-      this.task = task;
-      this.clientCookie = clientCookie;
-      this.priority = priority;
-      this.capability = capability;
-      this.requestedHosts = hosts;
-      this.requestedRacks = racks;
-      this.requestTime = requestTime;
-      this.uniqueId = ID_GEN.getAndIncrement();
-    }
-
-    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, 
long startTime) {
-      this.assignedInstance = instance;
-        this.containerId = containerId;
-      this.startTime = startTime;
-      assigned = true;
-    }
-
-    void setPreemptedInfo(long preemptTime) {
-      this.preempted = true;
-      this.assigned = false;
-      this.preemptTime = preemptTime;
-    }
-
-    void triedAssigningTask() {
-      numAssignAttempts++;
-    }
-
-    int getNumPreviousAssignAttempts() {
-      return numAssignAttempts;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      TaskInfo taskInfo = (TaskInfo) o;
-
-      if (uniqueId != taskInfo.uniqueId) {
-        return false;
-      }
-      return task.equals(taskInfo.task);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = (int) (uniqueId ^ (uniqueId >>> 32));
-      result = 31 * result + task.hashCode();
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "TaskInfo{" +
-          "task=" + task +
-          ", priority=" + priority +
-          ", startTime=" + startTime +
-          ", containerId=" + containerId +
-          ", assignedInstance=" + assignedInstance +
-          ", uniqueId=" + uniqueId +
-          '}';
-    }
-  }
-
-  // Newer tasks first.
-  private static class TaskStartComparator implements Comparator<TaskInfo> {
-
-    @Override
-    public int compare(TaskInfo o1, TaskInfo o2) {
-      if (o1.startTime > o2.startTime) {
-        return -1;
-      } else if (o1.startTime < o2.startTime) {
-        return 1;
-      } else {
-        // Comparing on time is not sufficient since two may be created at the 
same time,
-        // in which case inserting into a TreeSet/Map would break
-        if (o1.uniqueId > o2.uniqueId) {
-          return -1;
-        } else if (o1.uniqueId < o2.uniqueId) {
-          return 1;
-        } else {
-          return 0;
-        }
-      }
-    }
-  }
-
-  private static class SelectHostResult {
-    final NodeServiceInstancePair nodeServiceInstancePair;
-    final ScheduleResult scheduleResult;
-
-    SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
-      this.nodeServiceInstancePair = new 
NodeServiceInstancePair(serviceInstance, nodeInfo);
-      this.scheduleResult = ScheduleResult.SCHEDULED;
-    }
-
-    SelectHostResult(ScheduleResult scheduleResult) {
-      this.nodeServiceInstancePair = null;
-      this.scheduleResult = scheduleResult;
-    }
-  }
-
-  private static final SelectHostResult 
SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY =
-      new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES);
-  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY =
-      new SelectHostResult(ScheduleResult.DELAYED_LOCALITY);
-  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES =
-      new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);
-
-  private static class NodeServiceInstancePair {
-    final NodeInfo nodeInfo;
-    final ServiceInstance serviceInstance;
-
-    private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo 
nodeInfo) {
-      this.serviceInstance = serviceInstance;
-      this.nodeInfo = nodeInfo;
-    }
-
-    public ServiceInstance getServiceInstance() {
-      return serviceInstance;
-    }
-
-    public NodeInfo getNodeInfo() {
-      return nodeInfo;
-    }
-  }
-
-  private static final class NodeBlacklistConf {
-    private final long minDelay;
-    private final long maxDelay;
-    private final float backoffFactor;
-
-    public NodeBlacklistConf(long minDelay, long maxDelay, float 
backoffFactor) {
-      this.minDelay = minDelay;
-      this.maxDelay = maxDelay;
-      this.backoffFactor = backoffFactor;
-    }
-
-    @Override
-    public String toString() {
-      return "NodeBlacklistConf{" +
-          "minDelay=" + minDelay +
-          ", maxDelay=" + maxDelay +
-          ", backoffFactor=" + backoffFactor +
-          '}';
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto 
b/llap-server/src/protobuf/LlapDaemonProtocol.proto
deleted file mode 100644
index 944c96c..0000000
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hadoop.hive.llap.daemon.rpc";
-option java_outer_classname = "LlapDaemonProtocolProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-// TODO Change this as the interface evolves. Currently using Tez constructs.
-
-message UserPayloadProto {
-  optional bytes user_payload = 1;
-  optional int32 version = 2;
-}
-
-message EntityDescriptorProto {
-  optional string class_name = 1;
-  optional UserPayloadProto user_payload = 2;
-  optional bytes history_text = 3;
-}
-
-message IOSpecProto {
-  optional string connected_vertex_name = 1;
-  optional EntityDescriptorProto io_descriptor = 2;
-  optional int32 physical_edge_count = 3;
-}
-
-message GroupInputSpecProto {
-  optional string group_name = 1;
-  repeated string group_vertices = 2;
-  optional EntityDescriptorProto merged_input_descriptor = 3;
-}
-
-
-message FragmentSpecProto {
-  optional string fragment_identifier_string = 1;
-  optional string dag_name = 2;
-  optional int32 dag_id = 11;
-  optional string vertex_name = 3;
-  optional EntityDescriptorProto processor_descriptor = 4;
-  repeated IOSpecProto input_specs = 5;
-  repeated IOSpecProto output_specs = 6;
-  repeated GroupInputSpecProto grouped_input_specs = 7;
-  optional int32 vertex_parallelism = 8;
-  optional int32 fragment_number =9;
-  optional int32 attempt_number = 10;
-}
-
-message FragmentRuntimeInfo {
-  optional int32 num_self_and_upstream_tasks = 1;
-  optional int32 num_self_and_upstream_completed_tasks = 2;
-  optional int32 within_dag_priority = 3;
-  optional int64 dag_start_time = 4;
-  optional int64 first_attempt_start_time = 5;
-  optional int64 current_attempt_start_time = 6;
-}
-
-enum SourceStateProto {
-  S_SUCCEEDED = 1;
-  S_RUNNING = 2;
-}
-
-message QueryIdentifierProto {
-  optional string app_identifier = 1;
-  optional int32 dag_identifier = 2;
-}
-
-message SubmitWorkRequestProto {
-  optional string container_id_string = 1;
-  optional string am_host = 2;
-  optional int32 am_port = 3;
-  optional string token_identifier = 4;
-  optional bytes credentials_binary = 5;
-  optional string user = 6;
-  optional string application_id_string = 7;
-  optional int32 app_attempt_number = 8;
-  optional FragmentSpecProto fragment_spec = 9;
-  optional FragmentRuntimeInfo fragment_runtime_info = 10;
-}
-
-enum SubmissionStateProto {
-  ACCEPTED = 1;
-  REJECTED = 2;
-  EVICTED_OTHER = 3;
-}
-
-message SubmitWorkResponseProto {
-  optional SubmissionStateProto submission_state = 1;
-}
-
-message SourceStateUpdatedRequestProto {
-  optional QueryIdentifierProto query_identifier = 1;
-  optional string src_name = 2;
-  optional SourceStateProto state = 3;
-}
-
-message SourceStateUpdatedResponseProto {
-}
-
-message QueryCompleteRequestProto {
-  optional string query_id = 1;
-  optional QueryIdentifierProto query_identifier = 2;
-  optional int64 delete_delay = 4 [default = 0];
-}
-
-message QueryCompleteResponseProto {
-}
-
-message TerminateFragmentRequestProto {
-  optional QueryIdentifierProto query_identifier = 1;
-  optional string fragment_identifier_string = 2;
-}
-
-message TerminateFragmentResponseProto {
-}
-
-message GetTokenRequestProto {
-}
-
-message GetTokenResponseProto {
-  optional bytes token = 1;
-}
-
-service LlapDaemonProtocol {
-  rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
-  rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns 
(SourceStateUpdatedResponseProto);
-  rpc queryComplete(QueryCompleteRequestProto) returns 
(QueryCompleteResponseProto);
-  rpc terminateFragment(TerminateFragmentRequestProto) returns 
(TerminateFragmentResponseProto);
-}
-
-service LlapManagementProtocol {
-  rpc getDelegationToken(GetTokenRequestProto) returns (GetTokenResponseProto);
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 44c958d..1cef218 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -27,10 +27,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
+import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
 import org.junit.Test;
 
 public class TestLlapDaemonProtocolServerImpl {
@@ -42,8 +43,8 @@ public class TestLlapDaemonProtocolServerImpl {
     int rpcPort = HiveConf.getIntVar(daemonConf, 
ConfVars.LLAP_DAEMON_RPC_PORT);
     int numHandlers = HiveConf.getIntVar(daemonConf, 
ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
     ContainerRunner containerRunnerMock = mock(ContainerRunner.class);
-    LlapDaemonProtocolServerImpl server =
-        new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock,
+    LlapProtocolServerImpl server =
+        new LlapProtocolServerImpl(numHandlers, containerRunnerMock,
            new AtomicReference<InetSocketAddress>(), new 
AtomicReference<InetSocketAddress>(),
            rpcPort, rpcPort + 1);
     
when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
@@ -56,8 +57,8 @@ public class TestLlapDaemonProtocolServerImpl {
       server.start();
       InetSocketAddress serverAddr = server.getBindAddress();
 
-      LlapDaemonProtocolBlockingPB client =
-          new LlapDaemonProtocolClientImpl(new Configuration(), 
serverAddr.getHostName(),
+      LlapProtocolBlockingPB client =
+          new LlapProtocolClientImpl(new Configuration(), 
serverAddr.getHostName(),
               serverAddr.getPort(), null, null);
       SubmitWorkResponseProto responseProto = client.submitWork(null,
           SubmitWorkRequestProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
deleted file mode 100644
index a6af8c2..0000000
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.protobuf.Message;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.junit.Test;
-
-public class TestLlapDaemonProtocolClientProxy {
-
-  @Test (timeout = 5000)
-  public void testMultipleNodes() {
-    RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
-    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
-    LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
-
-    Message mockMessage = mock(Message.class);
-    LlapDaemonProtocolClientProxy.ExecuteRequestCallback 
mockExecuteRequestCallback = mock(
-        LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
-
-    // Request two messages
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId2, mockMessage, 
mockExecuteRequestCallback));
-
-    // Should go through in a single process call
-    requestManager.process();
-    assertEquals(2, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
-  }
-
-  @Test(timeout = 5000)
-  public void testSingleInvocationPerNode() {
-    RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
-    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
-
-    Message mockMessage = mock(Message.class);
-    LlapDaemonProtocolClientProxy.ExecuteRequestCallback 
mockExecuteRequestCallback = mock(
-        LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
-
-    // First request for host.
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.process();
-    assertEquals(1, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-
-    // Second request for host. Single invocation since the last has not 
completed.
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.process();
-    assertEquals(1, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(1, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(1, requestManager.currentLoopDisabledNodes.size());
-    assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
-
-    // Complete first request. Second pending request should go through.
-    requestManager.requestFinished(nodeId1);
-    requestManager.process();
-    assertEquals(2, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(2, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
-    assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
-  }
-
-
-  static class RequestManagerForTest extends 
LlapDaemonProtocolClientProxy.RequestManager {
-
-    int numSubmissionsCounters = 0;
-    private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new 
HashMap<>();
-
-    public RequestManagerForTest(int numThreads) {
-      super(numThreads);
-    }
-
-    protected void 
submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, 
LlapNodeId nodeId) {
-      numSubmissionsCounters++;
-      MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
-      if (nodeCount == null) {
-        nodeCount = new MutableInt(0);
-        numInvocationsPerNode.put(nodeId, nodeCount);
-      }
-      nodeCount.increment();
-    }
-
-    void reset() {
-      numSubmissionsCounters = 0;
-      numInvocationsPerNode.clear();
-    }
-
-  }
-
-  static class CallableRequestForTest extends 
LlapDaemonProtocolClientProxy.CallableRequest<Message, Message> {
-
-    protected CallableRequestForTest(LlapNodeId nodeId, Message message,
-                                     
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<Message> callback) {
-      super(nodeId, message, callback);
-    }
-
-    @Override
-    public Message call() throws Exception {
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
deleted file mode 100644
index 8f3d104..0000000
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.junit.Test;
-
-public class TestLlapTaskCommunicator {
-
-  @Test (timeout = 5000)
-  public void testEntityTracker1() {
-    LlapTaskCommunicator.EntityTracker entityTracker = new 
LlapTaskCommunicator.EntityTracker();
-
-    String host1 = "host1";
-    String host2 = "host2";
-    String host3 = "host3";
-    int port = 1451;
-
-
-    // Simple container registration and un-registration without any task 
attempt being involved.
-    ContainerId containerId101 = constructContainerId(101);
-    entityTracker.registerContainer(containerId101, host1, port);
-    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId101));
-
-    entityTracker.unregisterContainer(containerId101);
-    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
-    assertNull(entityTracker.getNodeIdForContainer(containerId101));
-    assertEquals(0, entityTracker.nodeMap.size());
-    assertEquals(0, entityTracker.attemptToNodeMap.size());
-    assertEquals(0, entityTracker.containerToNodeMap.size());
-
-
-    // Simple task registration and un-registration.
-    ContainerId containerId1 = constructContainerId(1);
-    TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1);
-    entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, 
port);
-    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId1));
-    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
-
-    entityTracker.unregisterTaskAttempt(taskAttemptId1);
-    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
-    assertNull(entityTracker.getNodeIdForContainer(containerId1));
-    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
-    assertEquals(0, entityTracker.nodeMap.size());
-    assertEquals(0, entityTracker.attemptToNodeMap.size());
-    assertEquals(0, entityTracker.containerToNodeMap.size());
-
-    // Register taskAttempt, unregister container. TaskAttempt should also be 
unregistered
-    ContainerId containerId201 = constructContainerId(201);
-    TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201);
-    entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, 
port);
-    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId201));
-    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
-
-    entityTracker.unregisterContainer(containerId201);
-    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
-    assertNull(entityTracker.getNodeIdForContainer(containerId201));
-    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
-    assertEquals(0, entityTracker.nodeMap.size());
-    assertEquals(0, entityTracker.attemptToNodeMap.size());
-    assertEquals(0, entityTracker.containerToNodeMap.size());
-
-    entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors
-  }
-
-
-  private ContainerId constructContainerId(int id) {
-    ContainerId containerId = mock(ContainerId.class);
-    doReturn(id).when(containerId).getId();
-    doReturn((long)id).when(containerId).getContainerId();
-    return containerId;
-  }
-
-  private TezTaskAttemptID constructTaskAttemptId(int id) {
-    TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
-    doReturn(id).when(taskAttemptId).getId();
-    return taskAttemptId;
-  }
-
-}

Reply via email to