This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5676788 HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita) 5676788 is described below commit 5676788893f264e6f42435100f6f25ba2b6d28b7 Author: Peter Vary <pv...@cloudera.com> AuthorDate: Tue Jul 16 10:51:28 2019 +0200 HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita) --- .../registry/impl/InactiveServiceInstance.java | 3 +- .../llap/registry/impl/LlapFixedRegistryImpl.java | 8 +- .../registry/impl/LlapZookeeperRegistryImpl.java | 8 +- .../tez/HostAffinitySplitLocationProvider.java | 4 +- .../org/apache/hadoop/hive/ql/exec/tez/Utils.java | 49 ++++--- .../tez/TestHostAffinitySplitLocationProvider.java | 4 +- .../apache/hadoop/hive/ql/exec/tez/TestUtils.java | 157 +++++++++++++++++++++ 7 files changed, 210 insertions(+), 23 deletions(-) diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java index 1d6b716..d9c2364 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry.impl; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; @@ -62,7 +63,7 @@ public class InactiveServiceInstance implements LlapServiceInstance { @Override public Map<String, String> getProperties() { - throw new UnsupportedOperationException(); + return Collections.emptyMap(); } @Override diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 344eba7..2bedb32 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -29,6 +29,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -122,7 +124,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry<LlapServiceInstanc return "host-" + host; } - private final class FixedServiceInstance implements LlapServiceInstance { + /** + * A single instance in an Llap Service. + */ + @VisibleForTesting + public final class FixedServiceInstance implements LlapServiceInstance { private final String host; private final String serviceAddress; diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 58a99f4..bf7b76b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.llap.registry.impl; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.registry.client.binding.RegistryUtils; import com.google.common.collect.Sets; @@ -186,7 +187,12 @@ public class LlapZookeeperRegistryImpl // Nothing for the zkCreate models } - private class DynamicServiceInstance + /** + * A dynamically changing instance in an Llap Service. Can become inactive if failing or can be + * blacklisted (set to 0 capacity) if too slow (See: BlacklistingLlapMetricsListener). + */ + @VisibleForTesting + public class DynamicServiceInstance extends ServiceInstanceBase implements LlapServiceInstance { private final int mngPort; private final int shufflePort; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index c5d96e5..a1d422b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -44,9 +44,9 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider private final static Logger LOG = LoggerFactory.getLogger( HostAffinitySplitLocationProvider.class); - private final boolean isDebugEnabled = LOG.isDebugEnabled(); - private final List<String> locations; + @VisibleForTesting + final List<String> locations; public HostAffinitySplitLocationProvider(List<String> knownLocations) { Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 1b7321b..db1a0e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -21,12 +21,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; -import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -50,21 +50,7 @@ public class Utils { LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf); - LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); - - Collection<LlapServiceInstance> serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); - Preconditions.checkArgument(!serviceInstances.isEmpty(), - "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); - ArrayList<String> locations = new ArrayList<>(serviceInstances.size()); - for (LlapServiceInstance serviceInstance : serviceInstances) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + - serviceInstance.getHost() + " to list for split locations"); - } - locations.add(serviceInstance.getHost()); - } - splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + return getCustomSplitLocationProvider(serviceRegistry, LOG); } else { splitLocationProvider = new SplitLocationProvider() { @Override @@ -84,4 +70,35 @@ public class Utils { } return splitLocationProvider; } + + @VisibleForTesting + static SplitLocationProvider getCustomSplitLocationProvider(LlapRegistryService serviceRegistry, Logger LOG) throws + IOException { + LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); + + Collection<LlapServiceInstance> serviceInstances = + serviceRegistry.getInstances().getAllInstancesOrdered(true); + Preconditions.checkArgument(!serviceInstances.isEmpty(), + "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); + ArrayList<String> locations = new ArrayList<>(serviceInstances.size()); + for (LlapServiceInstance serviceInstance : serviceInstances) { + String executors = + serviceInstance.getProperties().get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS); + if (executors != null && Integer.parseInt(executors) == 0) { + // If the executors set to 0 we should not consider this location for affinity + locations.add(null); + if (LOG.isDebugEnabled()) { + LOG.debug("Not adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " since executor number is 0"); + } + } else { + locations.add(serviceInstance.getHost()); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " to list for split locations"); + } + } + } + return new HostAffinitySplitLocationProvider(locations); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index 13f4676..61c98b7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -306,8 +306,8 @@ public class TestHostAffinitySplitLocationProvider { return inputSplit; } - private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, - long length, String[] locations) throws IOException { + static FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, + long length, String[] locations) throws IOException { FileSplit fileSplit; if (createOrcSplit) { fileSplit = mock(OrcSplit.class); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java new file mode 100644 index 0000000..b001d02 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * Test class for Utils methods. + */ +public class TestUtils { + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class); + + private static final String INACTIVE = "inactive"; + private static final String ACTIVE = "dynamic"; + private static final String DISABLED = "disabled"; + private static final String FIXED = "fix"; + + + @Mock + private LlapRegistryService mockRegistry; + + @Mock + private LlapServiceInstanceSet mockInstanceSet; + + @Before + public void setUp() { + initMocks(this); + } + + @Test + public void testGetSplitLocationProvider() throws IOException, URISyntaxException { + // Create test LlapServiceInstances to make sure that we can handle all of the instance types + List<LlapServiceInstance> instances = new ArrayList<>(3); + + // Set 1 inactive instance to make sure that this does not cause problem for us + LlapServiceInstance inactive = new InactiveServiceInstance(INACTIVE); + instances.add(inactive); + + LlapZookeeperRegistryImpl dynRegistry = new LlapZookeeperRegistryImpl("dyn", new HiveConf()); + Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(ACTIVE, 4000)); + Endpoint shuffle = RegistryTypeUtils.ipcEndpoint("shuffle", new InetSocketAddress(ACTIVE, 4000)); + Endpoint mng = RegistryTypeUtils.ipcEndpoint("llapmng", new InetSocketAddress(ACTIVE, 4000)); + Endpoint outputFormat = RegistryTypeUtils.ipcEndpoint("llapoutputformat", new InetSocketAddress(ACTIVE, 4000)); + Endpoint services = RegistryTypeUtils.webEndpoint("services", new URI(ACTIVE + ":4000")); + + // Set 1 active instance + ServiceRecord enabledSrv = new ServiceRecord(); + enabledSrv.addInternalEndpoint(rpcEndpoint); + enabledSrv.addInternalEndpoint(shuffle); + enabledSrv.addInternalEndpoint(mng); + enabledSrv.addInternalEndpoint(outputFormat); + enabledSrv.addExternalEndpoint(services); + + enabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 10); + enabledSrv.set(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, 100); + LlapZookeeperRegistryImpl.DynamicServiceInstance dynamic = + dynRegistry.new DynamicServiceInstance(enabledSrv); + instances.add(dynamic); + + // Set 1 instance with 0 executors + ServiceRecord disabledSrv = new ServiceRecord(enabledSrv); + disabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 0); + LlapZookeeperRegistryImpl.DynamicServiceInstance disabled = + dynRegistry.new DynamicServiceInstance(disabledSrv); + disabled.setHost(DISABLED); + instances.add(disabled); + + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + when(mockInstanceSet.getAllInstancesOrdered(anyBoolean())).thenReturn(instances); + SplitLocationProvider provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG); + + assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {ACTIVE}); + + // Check if fixed stuff is working as well + LlapFixedRegistryImpl fixRegistry = new LlapFixedRegistryImpl("llap", new HiveConf()); + + // Instance for testing fixed registry instances + LlapServiceInstance fixed = fixRegistry.new FixedServiceInstance(FIXED); + instances.remove(dynamic); + instances.add(fixed); + + provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG); + + assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {FIXED}); + } + + private void assertLocations(HostAffinitySplitLocationProvider provider, String[] expectedLocations) + throws IOException { + InputSplit inputSplit1 = + TestHostAffinitySplitLocationProvider.createMockFileSplit( + true, "path2", 0, 1000, new String[] {"HOST-1", "HOST-2"}); + + // Check that the provider does not return disabled/inactive instances and returns onl 1 location + List<String> result = new ArrayList<>(Arrays.asList(provider.getLocations(inputSplit1))); + assertEquals(1, result.size()); + assertFalse(result.contains(INACTIVE)); + assertFalse(result.contains(DISABLED)); + + // Since we can not check the results for every input, dig into the provider internal data to + // make sure that we have only the available host name in the location list + // Remove nulls + Set<String> knownLocations = new HashSet<>(); + knownLocations.addAll(provider.locations); + knownLocations.remove(null); + assertArrayEquals(expectedLocations, knownLocations.toArray(new String[] {})); + } +}