HIVE-12470. Allow splits to provide custom consistent locations, instead of being tied to data locality. (Siddharth Seth, reviewed by Prasanth Jayachandran) (cherry picked from commit c89b4b12e4d8fc03e64493e6c821b3bffee6f236)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5c071544 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5c071544 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5c071544 Branch: refs/heads/branch-2.0 Commit: 5c071544deead530452ed1c044bc86878802a296 Parents: 4f76d46 Author: Siddharth Seth <ss...@apache.org> Authored: Sun Jan 24 15:25:54 2016 -0800 Committer: Siddharth Seth <ss...@apache.org> Committed: Sun Jan 24 15:30:24 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../hive/llap/registry/ServiceInstanceSet.java | 8 + .../registry/impl/LlapFixedRegistryImpl.java | 21 ++- .../llap/registry/impl/LlapRegistryService.java | 38 ++++- .../registry/impl/LlapYarnRegistryImpl.java | 75 +++++++-- .../hive/ql/exec/tez/CustomPartitionVertex.java | 9 +- .../hive/ql/exec/tez/HiveSplitGenerator.java | 10 +- .../tez/HostAffinitySplitLocationProvider.java | 86 ++++++++++ .../hadoop/hive/ql/exec/tez/SplitGrouper.java | 24 +-- .../apache/hadoop/hive/ql/exec/tez/Utils.java | 58 +++++++ .../TestHostAffinitySplitLocationProvider.java | 163 +++++++++++++++++++ 11 files changed, 464 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 26ba4f0..14b86e3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2270,7 +2270,6 @@ public class HiveConf extends Configuration { "Whether to generate the splits locally or in the AM (tez only)"), HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consistent split locations when generating splits in the AM"), - HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"), @@ -2483,7 +2482,7 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.SECONDS), "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).", "llap.file.cleanup.delay-seconds"), - LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "", + LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", null, "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" + "YARN registry is used.", "llap.daemon.service.hosts"), LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s", @@ -2550,6 +2549,10 @@ public class HiveConf extends Configuration { "llap.daemon.service.port"), LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false, "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"), + LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", + false, + "Whether to setup split locations to match nodes on which llap daemons are running," + + " instead of using the locations provided by the split itself"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 388b5f3..be811eb 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,13 @@ public interface ServiceInstanceSet { public Map<String, ServiceInstance> getAll(); /** + * Gets a list containing all the instances. This list has the same iteration order across + * different processes, assuming the list of registry entries is the same. + * @return + */ + public List<ServiceInstance> getAllInstancesOrdered(); + + /** * Get an instance by worker identity. * * @param name http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index ef9de32..92044bb 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -17,8 +17,13 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -176,7 +181,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { private final class FixedServiceInstanceSet implements ServiceInstanceSet { - private final Map<String, ServiceInstance> instances = new HashMap<String, ServiceInstance>(); + // LinkedHashMap have a repeatable iteration order. + private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); public FixedServiceInstanceSet() { for (String host : hosts) { @@ -191,6 +197,19 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override + public List<ServiceInstance> getAllInstancesOrdered() { + List<ServiceInstance> list = new LinkedList<>(); + list.addAll(instances.values()); + Collections.sort(list, new Comparator<ServiceInstance>() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override public ServiceInstance getInstance(String name) { return instances.get(name); } http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 740f373..907faed 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -14,11 +14,13 @@ package org.apache.hadoop.hive.llap.registry.impl; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.service.AbstractService; @@ -32,11 +34,45 @@ public class LlapRegistryService extends AbstractService { private ServiceRegistry registry = null; private final boolean isDaemon; + private static final Map<String, LlapRegistryService> yarnRegistries = new HashMap<>(); + public LlapRegistryService(boolean isDaemon) { super("LlapRegistryService"); this.isDaemon = isDaemon; } + /** + * Helper method to get a ServiceRegistry instance to read from the registry. + * This should not be used by LLAP daemons. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return + */ + public static synchronized LlapRegistryService getClient(Configuration conf) { + String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); + LlapRegistryService registry; + if (hosts.startsWith("@")) { + // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. + String name = hosts.substring(1); + if (yarnRegistries.containsKey(name)) { + registry = yarnRegistries.get(name); + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + yarnRegistries.put(name, registry); + } + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + } + LOG.info("Using LLAP registry (client) type: " + registry); + return registry; + } + + @Override public void serviceInit(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java index fc2ebf2..efe31cc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java @@ -20,15 +20,20 @@ import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; @@ -269,16 +274,47 @@ public class LlapYarnRegistryImpl implements ServiceRegistry { // LinkedHashMap to retain iteration order. private final Map<String, ServiceInstance> instances = new LinkedHashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); @Override - public synchronized Map<String, ServiceInstance> getAll() { + public Map<String, ServiceInstance> getAll() { // Return a copy. Instances may be modified during a refresh. - return new LinkedHashMap<>(instances); + readLock.lock(); + try { + return new LinkedHashMap<>(instances); + } finally { + readLock.unlock(); + } } @Override - public synchronized ServiceInstance getInstance(String name) { - return instances.get(name); + public List<ServiceInstance> getAllInstancesOrdered() { + List<ServiceInstance> list = new LinkedList<>(); + readLock.lock(); + try { + list.addAll(instances.values()); + } finally { + readLock.unlock(); + } + Collections.sort(list, new Comparator<ServiceInstance>() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override + public ServiceInstance getInstance(String name) { + readLock.lock(); + try { + return instances.get(name); + } finally { + readLock.unlock(); + } } @Override @@ -290,7 +326,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry { Map<String, ServiceRecord> records = RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); // Synchronize after reading the service records from the external service (ZK) - synchronized (this) { + writeLock.lock(); + try { Set<String> latestKeys = new HashSet<String>(); LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); for (ServiceRecord rec : records.values()) { @@ -333,28 +370,34 @@ public class LlapYarnRegistryImpl implements ServiceRegistry { } else { this.instances.putAll(freshInstances); } + } finally { + writeLock.unlock(); } } @Override - public synchronized Set<ServiceInstance> getByHost(String host) { + public Set<ServiceInstance> getByHost(String host) { // TODO Maybe store this as a map which is populated during construction, to avoid walking // the map on each request. + readLock.lock(); Set<ServiceInstance> byHost = new HashSet<ServiceInstance>(); - - for (ServiceInstance i : instances.values()) { - if (host.equals(i.getHost())) { - // all hosts in instances should be alive in this impl - byHost.add(i); + try { + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + i.getHost()); + } } if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + i.getHost()); + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } + return byHost; + } finally { + readLock.unlock(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; } } http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index e9c14b1..45d3cd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.TreeSet; import com.google.common.collect.LinkedListMultimap; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -271,25 +272,27 @@ public class CustomPartitionVertex extends VertexManagerPlugin { HashMultimap.<Integer, InputSplit> create(); boolean secondLevelGroupingDone = false; if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) { + SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); for (Integer key : bucketToInitialSplitMap.keySet()) { InputSplit[] inputSplitArray = (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap<Integer, InputSplit> groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, mainWorkName.isEmpty()); + availableSlots, inputName, mainWorkName.isEmpty(), splitLocationProvider); if (mainWorkName.isEmpty() == false) { Multimap<Integer, InputSplit> singleBucketToGroupedSplit = HashMultimap.<Integer, InputSplit> create(); singleBucketToGroupedSplit.putAll(key, groupedSplit.values()); groupedSplit = grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots, - HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES)); + HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null); secondLevelGroupingDone = true; } bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone); } else { + SplitLocationProvider splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); // do not group across files in case of side work because there is only 1 KV reader per // grouped split. This would affect SMB joins where we want to find the smallest key in // all the bucket files. @@ -298,7 +301,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin { (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap<Integer, InputSplit> groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, false); + availableSlots, inputName, false, splitLocationProvider); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } /* http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 8ebfe69..8e48c2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; @@ -79,6 +80,7 @@ public class HiveSplitGenerator extends InputInitializer { private final MRInputUserPayloadProto userPayloadProto; private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); + private final SplitLocationProvider splitLocationProvider; public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, SerDeException { @@ -91,6 +93,9 @@ public class HiveSplitGenerator extends InputInitializer { this.jobConf = new JobConf(conf); + this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); + LOG.info("SplitLocationProvider: " + splitLocationProvider); + // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); @@ -149,6 +154,7 @@ public class HiveSplitGenerator extends InputInitializer { conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + // Raw splits InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); @@ -160,10 +166,10 @@ public class HiveSplitGenerator extends InputInitializer { } Multimap<Integer, InputSplit> groupedSplits = - splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots); + splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, splitLocationProvider); // And finally return them in a flat array InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); - LOG.info("Number of grouped splits: " + flatSplits.length); + LOG.info("Number of split groups: " + flatSplits.length); List<TaskLocationHint> locationHints = splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits); http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java new file mode 100644 index 0000000..c06499e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.hive.common.util.Murmur3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This maps a split (path + offset) to an index based on the number of locations provided. + * + * If locations do not change across jobs, the intention is to map the same split to the same node. + * + * A big problem is when nodes change (added, removed, temporarily removed and re-added) etc. That changes + * the number of locations / position of locations - and will cause the cache to be almost completely invalidated. + * + * TODO: Support for consistent hashing when combining the split location generator and the ServiceRegistry. + * + */ +public class HostAffinitySplitLocationProvider implements SplitLocationProvider { + + private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class); + private final boolean isDebugEnabled = LOG.isDebugEnabled(); + + private final String[] knownLocations; + + public HostAffinitySplitLocationProvider(String[] knownLocations) { + Preconditions.checkState(knownLocations != null && knownLocations.length != 0, + HostAffinitySplitLocationProvider.class.getName() + + "needs at least 1 location to function"); + this.knownLocations = knownLocations; + } + + @Override + public String[] getLocations(InputSplit split) throws IOException { + if (split instanceof FileSplit) { + FileSplit fsplit = (FileSplit) split; + long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart()); + int indexRaw = (int) (hash % knownLocations.length); + int index = Math.abs(indexRaw); + if (isDebugEnabled) { + LOG.debug( + "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + + fsplit.getLength() + " mapped to index=" + index + ", location=" + + knownLocations[index]); + } + return new String[]{knownLocations[index]}; + } else { + if (isDebugEnabled) { + LOG.debug("Split: " + split + " is not a FileSplit. Using default locations"); + } + return split.getLocations(); + } + } + + private long generateHash(String path, long startOffset) throws IOException { + // Explicitly using only the start offset of a split, and not the length. + // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node. + // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split + // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them + // to different nodes. + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeLong(startOffset); + dob.writeUTF(path); + return Murmur3.hash64(dob.getData(), 0, dob.getLength()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index aaaa6a5..f4496df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -42,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; import org.apache.tez.dag.api.TaskLocationHint; @@ -65,14 +65,13 @@ public class SplitGrouper { private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); - - /** * group splits for each bucket separately - while evenly filling all the * available slots with tasks */ public Multimap<Integer, InputSplit> group(Configuration conf, - Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves) + Multimap<Integer, InputSplit> bucketSplitMultimap, int availableSlots, float waves, + SplitLocationProvider splitLocationProvider) throws IOException { // figure out how many tasks we want for each bucket @@ -90,9 +89,9 @@ public class SplitGrouper { InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]); InputSplit[] groupedSplits = tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId), - HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator()); + HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator(), splitLocationProvider); - LOG.info("Original split size is " + rawSplits.length + " grouped split size is " + LOG.info("Original split count is " + rawSplits.length + " grouped split count is " + groupedSplits.length + ", for bucket: " + bucketId); for (InputSplit inSplit : groupedSplits) { @@ -155,9 +154,10 @@ public class SplitGrouper { public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, - float waves, int availableSlots) + float waves, int availableSlots, + SplitLocationProvider locationProvider) throws Exception { - return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true); + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider); } /** Generate groups of splits, separated by schema evolution boundaries */ @@ -166,10 +166,12 @@ public class SplitGrouper { InputSplit[] splits, float waves, int availableSlots, String inputName, - boolean groupAcrossFiles) throws + boolean groupAcrossFiles, + SplitLocationProvider locationProvider) throws Exception { MapWork work = populateMapWork(jobConf, inputName); + // ArrayListMultimap is important here to retain the ordering for the splits. Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); @@ -188,7 +190,7 @@ public class SplitGrouper { // group them into the chunks we want Multimap<Integer, InputSplit> groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves); + this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } @@ -207,6 +209,8 @@ public class SplitGrouper { // mapping of bucket id to number of required tasks to run Map<Integer, Integer> bucketTaskMap = new HashMap<Integer, Integer>(); + // TODO HIVE-12255. Make use of SplitSizeEstimator. + // The actual task computation needs to be looked at as well. // compute the total size per bucket long totalSize = 0; boolean earlyExit = false; http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java new file mode 100644 index 0000000..3eb858b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.slf4j.Logger; + +public class Utils { + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws + IOException { + boolean useCustomLocations = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); + SplitLocationProvider splitLocationProvider; + LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); + if (useCustomLocations) { + LlapRegistryService serviceRegistry; + serviceRegistry = LlapRegistryService.getClient(conf); + + List<ServiceInstance> serviceInstances = + serviceRegistry.getInstances().getAllInstancesOrdered(); + String[] locations = new String[serviceInstances.size()]; + int i = 0; + for (ServiceInstance serviceInstance : serviceInstances) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " to list for split locations"); + } + locations[i++] = serviceInstance.getHost(); + } + splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + } else { + splitLocationProvider = null; + } + return splitLocationProvider; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/5c071544/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java new file mode 100644 index 0000000..d98a5ff --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.junit.Test; + +public class TestHostAffinitySplitLocationProvider { + + + private static final String[] locations = new String[5]; + private static final Set<String> locationsSet = new HashSet<>(); + private static final String[] executorLocations = new String[9]; + private static final Set<String> executorLocationsSet = new HashSet<>(); + + static { + for (int i = 0 ; i < 5 ; i++) { + locations[i] = "location" + i; + locationsSet.add(locations[i]); + } + + for (int i = 0 ; i < 9 ; i++) { + executorLocations[i] = "execLocation" + i; + executorLocationsSet.add(executorLocations[i]); + } + + } + + @Test (timeout = 5000) + public void testNonFileSplits() throws IOException { + + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]}); + InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]}); + + assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1)); + assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2)); + } + + @Test (timeout = 5000) + public void testOrcSplitsBasic() throws IOException { + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]}); + InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]}); + InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]}); + + String[] retLoc1 = locationProvider.getLocations(os1); + String[] retLoc2 = locationProvider.getLocations(os2); + String[] retLoc3 = locationProvider.getLocations(os3); + + assertEquals(1, retLoc1.length); + assertFalse(locationsSet.contains(retLoc1[0])); + assertTrue(executorLocationsSet.contains(retLoc1[0])); + + assertEquals(1, retLoc2.length); + assertFalse(locationsSet.contains(retLoc2[0])); + assertTrue(executorLocationsSet.contains(retLoc2[0])); + + assertEquals(1, retLoc3.length); + assertFalse(locationsSet.contains(retLoc3[0])); + assertTrue(executorLocationsSet.contains(retLoc3[0])); + } + + @Test (timeout = 5000) + public void testOrcSplitsLocationAffinity() throws IOException { + HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); + + // Same file, offset, different lengths + InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]}); + InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]}); + // Same file, different offset + InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]}); + + String[] retLoc11 = locationProvider.getLocations(os11); + String[] retLoc12 = locationProvider.getLocations(os12); + String[] retLoc13 = locationProvider.getLocations(os13); + + assertEquals(1, retLoc11.length); + assertFalse(locationsSet.contains(retLoc11[0])); + assertTrue(executorLocationsSet.contains(retLoc11[0])); + + assertEquals(1, retLoc12.length); + assertFalse(locationsSet.contains(retLoc12[0])); + assertTrue(executorLocationsSet.contains(retLoc12[0])); + + assertEquals(1, retLoc13.length); + assertFalse(locationsSet.contains(retLoc13[0])); + assertTrue(executorLocationsSet.contains(retLoc13[0])); + + // Verify the actual locations being correct. + // os13 should be on a different location. Splits are supposed to be consistent across JVMs, + // the test is setup to verify a different host (make sure not to hash to the same host as os11,os12). + // If the test were to fail because the host is the same - the assumption about consistent across JVM + // instances is likely incorrect. + assertEquals(retLoc11[0], retLoc12[0]); + assertNotEquals(retLoc11[0], retLoc13[0]); + + + // Get locations again, and make sure they're the same. + String[] retLoc112 = locationProvider.getLocations(os11); + String[] retLoc122 = locationProvider.getLocations(os12); + String[] retLoc132 = locationProvider.getLocations(os13); + assertArrayEquals(retLoc11, retLoc112); + assertArrayEquals(retLoc12, retLoc122); + assertArrayEquals(retLoc13, retLoc132); + } + + + private InputSplit createMockInputSplit(String[] locations) throws IOException { + InputSplit inputSplit = mock(InputSplit.class); + doReturn(locations).when(inputSplit).getLocations(); + return inputSplit; + } + + private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, + long length, String[] locations) throws IOException { + FileSplit fileSplit; + if (createOrcSplit) { + fileSplit = mock(OrcSplit.class); + } else { + fileSplit = mock(FileSplit.class); + } + + doReturn(start).when(fileSplit).getStart(); + doReturn(length).when(fileSplit).getLength(); + doReturn(new Path(fakePathString)).when(fileSplit).getPath(); + doReturn(locations).when(fileSplit).getLocations(); + + doReturn(locations).when(fileSplit).getLocations(); + return fileSplit; + } + + +}