[17/20] hive git commit: HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)

2017-02-27 Thread sershe
HIVE-16013: Fragments without locality can stack up on nodes (Prasanth 
Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f

Branch: refs/heads/hive-14535
Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f
Parents: 7f1c29e
Author: Prasanth Jayachandran 
Authored: Mon Feb 27 11:54:45 2017 -0800
Committer: Prasanth Jayachandran 
Committed: Mon Feb 27 11:54:45 2017 -0800

--
 .../tezplugins/LlapTaskSchedulerService.java| 110 ++
 .../TestLlapTaskSchedulerService.java   | 199 +--
 2 files changed, 260 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
--
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 97191f8..fe73ff1 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
@@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
 
   private final LlapTaskSchedulerMetrics metrics;
   private final JvmPauseMonitor pauseMonitor;
+  private final Random random = new Random();
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
 this(taskSchedulerContext, new MonotonicClock(), true);
@@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
 }
   }
 
+  @VisibleForTesting
+  public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) {
+this.activeInstances = serviceInstanceSet;
+  }
+
   private class NodeStateChangeListener implements 
ServiceInstanceStateChangeListener {
 private final Logger LOG = 
LoggerFactory.getLogger(NodeStateChangeListener.class);
 
@@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
 
   /* fall through - miss in locality or no locality-requested */
   Collection instances = 
activeInstances.getAllInstancesOrdered(true);
-  ArrayList allNodes = new ArrayList<>(instances.size());
+  List allNodes = new ArrayList<>(instances.size());
+  List activeNodesWithFreeSlots = new ArrayList<>();
   for (ServiceInstance inst : instances) {
-NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-if (nodeInfo != null) {
-  allNodes.add(nodeInfo);
+if (inst instanceof InactiveServiceInstance) {
+  allNodes.add(null);
+} else {
+  NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+  if (nodeInfo == null) {
+allNodes.add(null);
+  } else {
+allNodes.add(nodeInfo);
+if (nodeInfo.canAcceptTask()) {
+  activeNodesWithFreeSlots.add(nodeInfo);
+}
+  }
 }
   }
 
+  if (allNodes.isEmpty()) {
+return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+  }
+
+  // no locality-requested, randomly pick a node containing free slots
   if (requestedHosts == null || requestedHosts.length == 0) {
-// no locality-requested, iterate the available hosts in consistent 
order from the beginning
-if (LOG.isDebugEnabled()) {
-  LOG.debug("No-locality requested. Attempting to allocate next 
available host for task={}", request.task);
-}
-for (NodeInfo nodeInfo : allNodes) {
-  if (nodeInfo.canAcceptTask()) {
-LOG.info("Assigning {} when looking for any host, from #hosts={}, 
requestedHosts={}",
-nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == 
null || requestedHosts.length == 0)
-? "null" : 

hive git commit: HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)

2017-02-27 Thread prasanthj
Repository: hive
Updated Branches:
  refs/heads/master 7f1c29ebe -> fc970f6f6


HIVE-16013: Fragments without locality can stack up on nodes (Prasanth 
Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f

Branch: refs/heads/master
Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f
Parents: 7f1c29e
Author: Prasanth Jayachandran 
Authored: Mon Feb 27 11:54:45 2017 -0800
Committer: Prasanth Jayachandran 
Committed: Mon Feb 27 11:54:45 2017 -0800

--
 .../tezplugins/LlapTaskSchedulerService.java| 110 ++
 .../TestLlapTaskSchedulerService.java   | 199 +--
 2 files changed, 260 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
--
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 97191f8..fe73ff1 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
@@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
 
   private final LlapTaskSchedulerMetrics metrics;
   private final JvmPauseMonitor pauseMonitor;
+  private final Random random = new Random();
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
 this(taskSchedulerContext, new MonotonicClock(), true);
@@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
 }
   }
 
+  @VisibleForTesting
+  public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) {
+this.activeInstances = serviceInstanceSet;
+  }
+
   private class NodeStateChangeListener implements 
ServiceInstanceStateChangeListener {
 private final Logger LOG = 
LoggerFactory.getLogger(NodeStateChangeListener.class);
 
@@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
 
   /* fall through - miss in locality or no locality-requested */
   Collection instances = 
activeInstances.getAllInstancesOrdered(true);
-  ArrayList allNodes = new ArrayList<>(instances.size());
+  List allNodes = new ArrayList<>(instances.size());
+  List activeNodesWithFreeSlots = new ArrayList<>();
   for (ServiceInstance inst : instances) {
-NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-if (nodeInfo != null) {
-  allNodes.add(nodeInfo);
+if (inst instanceof InactiveServiceInstance) {
+  allNodes.add(null);
+} else {
+  NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+  if (nodeInfo == null) {
+allNodes.add(null);
+  } else {
+allNodes.add(nodeInfo);
+if (nodeInfo.canAcceptTask()) {
+  activeNodesWithFreeSlots.add(nodeInfo);
+}
+  }
 }
   }
 
+  if (allNodes.isEmpty()) {
+return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+  }
+
+  // no locality-requested, randomly pick a node containing free slots
   if (requestedHosts == null || requestedHosts.length == 0) {
-// no locality-requested, iterate the available hosts in consistent 
order from the beginning
-if (LOG.isDebugEnabled()) {
-  LOG.debug("No-locality requested. Attempting to allocate next 
available host for task={}", request.task);
-}
-for (NodeInfo nodeInfo : allNodes) {
-  if (nodeInfo.canAcceptTask()) {
-LOG.info("Assigning {} when looking for any host, from #hosts={}, 
requestedHosts={}",
-nodeInfo.toShortString(), allNodes.size(), ((requestedHosts ==