[17/20] hive git commit: HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)
HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f Branch: refs/heads/hive-14535 Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f Parents: 7f1c29e Author: Prasanth JayachandranAuthored: Mon Feb 27 11:54:45 2017 -0800 Committer: Prasanth Jayachandran Committed: Mon Feb 27 11:54:45 2017 -0800 -- .../tezplugins/LlapTaskSchedulerService.java| 110 ++ .../TestLlapTaskSchedulerService.java | 199 +-- 2 files changed, 260 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java -- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 97191f8..fe73ff1 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; @@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final LlapTaskSchedulerMetrics metrics; private final JvmPauseMonitor pauseMonitor; + private final Random random = new Random(); public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + @VisibleForTesting + public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) { +this.activeInstances = serviceInstanceSet; + } + private class NodeStateChangeListener implements ServiceInstanceStateChangeListener { private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class); @@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends TaskScheduler { /* fall through - miss in locality or no locality-requested */ Collection instances = activeInstances.getAllInstancesOrdered(true); - ArrayList allNodes = new ArrayList<>(instances.size()); + List allNodes = new ArrayList<>(instances.size()); + List activeNodesWithFreeSlots = new ArrayList<>(); for (ServiceInstance inst : instances) { -NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); -if (nodeInfo != null) { - allNodes.add(nodeInfo); +if (inst instanceof InactiveServiceInstance) { + allNodes.add(null); +} else { + NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); + if (nodeInfo == null) { +allNodes.add(null); + } else { +allNodes.add(nodeInfo); +if (nodeInfo.canAcceptTask()) { + activeNodesWithFreeSlots.add(nodeInfo); +} + } } } + if (allNodes.isEmpty()) { +return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } + + // no locality-requested, randomly pick a node containing free slots if (requestedHosts == null || requestedHosts.length == 0) { -// no locality-requested, iterate the available hosts in consistent order from the beginning -if (LOG.isDebugEnabled()) { - LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task); -} -for (NodeInfo nodeInfo : allNodes) { - if (nodeInfo.canAcceptTask()) { -LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}", -nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0) -? "null" :
hive git commit: HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)
Repository: hive Updated Branches: refs/heads/master 7f1c29ebe -> fc970f6f6 HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f Branch: refs/heads/master Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f Parents: 7f1c29e Author: Prasanth JayachandranAuthored: Mon Feb 27 11:54:45 2017 -0800 Committer: Prasanth Jayachandran Committed: Mon Feb 27 11:54:45 2017 -0800 -- .../tezplugins/LlapTaskSchedulerService.java| 110 ++ .../TestLlapTaskSchedulerService.java | 199 +-- 2 files changed, 260 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java -- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 97191f8..fe73ff1 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; @@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final LlapTaskSchedulerMetrics metrics; private final JvmPauseMonitor pauseMonitor; + private final Random random = new Random(); public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + @VisibleForTesting + public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) { +this.activeInstances = serviceInstanceSet; + } + private class NodeStateChangeListener implements ServiceInstanceStateChangeListener { private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class); @@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends TaskScheduler { /* fall through - miss in locality or no locality-requested */ Collection instances = activeInstances.getAllInstancesOrdered(true); - ArrayList allNodes = new ArrayList<>(instances.size()); + List allNodes = new ArrayList<>(instances.size()); + List activeNodesWithFreeSlots = new ArrayList<>(); for (ServiceInstance inst : instances) { -NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); -if (nodeInfo != null) { - allNodes.add(nodeInfo); +if (inst instanceof InactiveServiceInstance) { + allNodes.add(null); +} else { + NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); + if (nodeInfo == null) { +allNodes.add(null); + } else { +allNodes.add(nodeInfo); +if (nodeInfo.canAcceptTask()) { + activeNodesWithFreeSlots.add(nodeInfo); +} + } } } + if (allNodes.isEmpty()) { +return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } + + // no locality-requested, randomly pick a node containing free slots if (requestedHosts == null || requestedHosts.length == 0) { -// no locality-requested, iterate the available hosts in consistent order from the beginning -if (LOG.isDebugEnabled()) { - LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task); -} -for (NodeInfo nodeInfo : allNodes) { - if (nodeInfo.canAcceptTask()) { -LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}", -nodeInfo.toShortString(), allNodes.size(), ((requestedHosts ==