Author: sseth Date: Thu Aug 15 18:32:54 2013 New Revision: 1514430 URL: http://svn.apache.org/r1514430 Log: merge MAPREDUCE-5352 from branch-2.1-beta. Optimize node local splits generated by CombineFileInputFormat. (sseth)
Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1514430&r1=1514429&r2=1514430&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/CHANGES.txt Thu Aug 15 18:32:54 2013 @@ -184,6 +184,9 @@ Release 2.1.0-beta - 2013-08-06 MAPREDUCE-5268. Improve history server startup performance (Karthik Kambatla via jlowe) + MAPREDUCE-5352. Optimize node local splits generated by + CombineFileInputFormat. (sseth) + BUG FIXES MAPREDUCE-4671. AM does not tell the RM about container requests which are Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1514430&r1=1514429&r2=1514430&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Thu Aug 15 18:32:54 2013 @@ -22,13 +22,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.HashSet; import java.util.List; import java.util.HashMap; import java.util.Set; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -50,6 +55,8 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; /** * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in @@ -79,6 +86,8 @@ import com.google.common.annotations.Vis public abstract class CombineFileInputFormat<K, V> extends FileInputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class); + public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; public static final String SPLIT_MINSIZE_PERRACK = @@ -186,6 +195,8 @@ public abstract class CombineFileInputFo maxSize = maxSplitSize; } else { maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + // If maxSize is not configured, a single split will be generated per + // node. } if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { throw new IOException("Minimum split size pernode " + minSizeNode + @@ -271,8 +282,8 @@ public abstract class CombineFileInputFo new HashMap<OneBlockInfo, String[]>(); // mapping from a node to the list of blocks that it contains - HashMap<String, List<OneBlockInfo>> nodeToBlocks = - new HashMap<String, List<OneBlockInfo>>(); + HashMap<String, Set<OneBlockInfo>> nodeToBlocks = + new HashMap<String, Set<OneBlockInfo>>(); files = new OneFileInfo[paths.length]; if (paths.length == 0) { @@ -292,9 +303,9 @@ public abstract class CombineFileInputFo } @VisibleForTesting - void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks, - HashMap<OneBlockInfo, String[]> blockToNodes, - HashMap<String, List<OneBlockInfo>> rackToBlocks, + void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks, + Map<OneBlockInfo, String[]> blockToNodes, + Map<String, List<OneBlockInfo>> rackToBlocks, long totLength, long maxSize, long minSizeNode, @@ -302,83 +313,118 @@ public abstract class CombineFileInputFo List<InputSplit> splits ) { ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); - Set<String> nodes = new HashSet<String>(); long curSplitSize = 0; - int numNodes = nodeToBlocks.size(); + int totalNodes = nodeToBlocks.size(); long totalLength = totLength; + Multiset<String> splitsPerNode = HashMultiset.create(); + Set<String> completedNodes = new HashSet<String>(); + while(true) { // it is allowed for maxSize to be 0. Disable smoothing load for such cases - int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ? - ((int) (totalLength/maxSize))/numNodes - : Integer.MAX_VALUE; - int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1; - numNodes = 0; - // process all nodes and create splits that are local to a node. - for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks + // process all nodes and create splits that are local to a node. Generate + // one split per node iteration, and walk over nodes multiple times to + // distribute the splits across nodes. + for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks .entrySet().iterator(); iter.hasNext();) { - Map.Entry<String, List<OneBlockInfo>> one = iter.next(); - nodes.add(one.getKey()); - List<OneBlockInfo> blocksInNode = one.getValue(); + Map.Entry<String, Set<OneBlockInfo>> one = iter.next(); + + String node = one.getKey(); + + // Skip the node if it has previously been marked as completed. + if (completedNodes.contains(node)) { + continue; + } + + Set<OneBlockInfo> blocksInCurrentNode = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. - int splitsInNode = 0; - for (OneBlockInfo oneblock : blocksInNode) { - if (blockToNodes.containsKey(oneblock)) { - validBlocks.add(oneblock); - blockToNodes.remove(oneblock); - curSplitSize += oneblock.length; - - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - totalLength -= curSplitSize; - curSplitSize = 0; - validBlocks.clear(); - splitsInNode++; - if (splitsInNode == maxSplitsByNodeOnly) { - // stop grouping on a node so as not to create - // disproportionately more splits on a node because it happens - // to have many blocks - // consider only these nodes in next round of grouping because - // they have leftover blocks that may need to be grouped - numNodes++; - break; - } - } + Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator(); + while (oneBlockIter.hasNext()) { + OneBlockInfo oneblock = oneBlockIter.next(); + + // Remove all blocks which may already have been assigned to other + // splits. + if(!blockToNodes.containsKey(oneblock)) { + oneBlockIter.remove(); + continue; + } + + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(splits, Collections.singleton(node), validBlocks); + totalLength -= curSplitSize; + curSplitSize = 0; + + splitsPerNode.add(node); + + // Remove entries from blocksInNode so that we don't walk these + // again. + blocksInCurrentNode.removeAll(validBlocks); + validBlocks.clear(); + + // Done creating a single split for this node. Move on to the next + // node so that splits are distributed across nodes. + break; } + } - // if there were any blocks left over and their combined size is - // larger than minSplitNode, then combine them into one split. - // Otherwise add them back to the unprocessed pool. It is likely - // that they will be combined with other blocks from the - // same rack later on. - if (minSizeNode != 0 && curSplitSize >= minSizeNode - && splitsInNode == 0) { - // haven't created any split on this machine. so its ok to add a - // smaller - // one for parallelism. Otherwise group it in the rack for balanced - // size - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - totalLength -= curSplitSize; - } else { - for (OneBlockInfo oneblock : validBlocks) { - blockToNodes.put(oneblock, oneblock.hosts); + if (validBlocks.size() != 0) { + // This implies that the last few blocks (or all in case maxSize=0) + // were not part of a split. The node is complete. + + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the + // same rack later on. + // This condition also kicks in when max split size is not set. All + // blocks on a node will be grouped together into a single split. + if (minSizeNode != 0 && curSplitSize >= minSizeNode + && splitsPerNode.count(node) == 0) { + // haven't created any split on this machine. so its ok to add a + // smaller one for parallelism. Otherwise group it in the rack for + // balanced size create an input split and add it to the splits + // array + addCreatedSplit(splits, Collections.singleton(node), validBlocks); + totalLength -= curSplitSize; + splitsPerNode.add(node); + // Remove entries from blocksInNode so that we don't walk this again. + blocksInCurrentNode.removeAll(validBlocks); + // The node is done. This was the last set of blocks for this node. + } else { + // Put the unplaced blocks back into the pool for later rack-allocation. + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); + } } + validBlocks.clear(); + curSplitSize = 0; + completedNodes.add(node); + } else { // No in-flight blocks. + if (blocksInCurrentNode.size() == 0) { + // Node is done. All blocks were fit into node-local splits. + completedNodes.add(node); + } // else Run through the node again. } - validBlocks.clear(); - nodes.clear(); - curSplitSize = 0; } - - if(!(numNodes>0 && totalLength>0)) { + + // Check if node-local assignments are complete. + if (completedNodes.size() == totalNodes || totalLength == 0) { + // All nodes have been walked over and marked as completed or all blocks + // have been assigned. The rest should be handled via rackLock assignment. + LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " + + completedNodes.size() + ", size left: " + totalLength); break; } } @@ -527,7 +573,7 @@ public abstract class CombineFileInputFo boolean isSplitable, HashMap<String, List<OneBlockInfo>> rackToBlocks, HashMap<OneBlockInfo, String[]> blockToNodes, - HashMap<String, List<OneBlockInfo>> nodeToBlocks, + HashMap<String, Set<OneBlockInfo>> nodeToBlocks, HashMap<String, Set<String>> rackToNodes, long maxSize) throws IOException { @@ -598,10 +644,10 @@ public abstract class CombineFileInputFo @VisibleForTesting static void populateBlockInfo(OneBlockInfo[] blocks, - HashMap<String, List<OneBlockInfo>> rackToBlocks, - HashMap<OneBlockInfo, String[]> blockToNodes, - HashMap<String, List<OneBlockInfo>> nodeToBlocks, - HashMap<String, Set<String>> rackToNodes) { + Map<String, List<OneBlockInfo>> rackToBlocks, + Map<OneBlockInfo, String[]> blockToNodes, + Map<String, Set<OneBlockInfo>> nodeToBlocks, + Map<String, Set<String>> rackToNodes) { for (OneBlockInfo oneblock : blocks) { // add this block to the block --> node locations map blockToNodes.put(oneblock, oneblock.hosts); @@ -633,9 +679,9 @@ public abstract class CombineFileInputFo // add this block to the node --> block map for (int j = 0; j < oneblock.hosts.length; j++) { String node = oneblock.hosts[j]; - List<OneBlockInfo> blklist = nodeToBlocks.get(node); + Set<OneBlockInfo> blklist = nodeToBlocks.get(node); if (blklist == null) { - blklist = new ArrayList<OneBlockInfo>(); + blklist = new LinkedHashSet<OneBlockInfo>(); nodeToBlocks.put(node, blklist); } blklist.add(oneblock); @@ -696,7 +742,7 @@ public abstract class CombineFileInputFo return fs.getFileBlockLocations(stat, 0, stat.getLen()); } - private static void addHostToRack(HashMap<String, Set<String>> rackToNodes, + private static void addHostToRack(Map<String, Set<String>> rackToNodes, String rack, String host) { Set<String> hosts = rackToNodes.get(rack); if (hosts == null) { Modified: hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1514430&r1=1514429&r2=1514430&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-2.1.0-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Thu Aug 15 18:32:54 2013 @@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.ArrayList; +import java.util.Map; import java.util.Set; -import java.util.zip.GZIPOutputStream; +import java.util.TreeMap; import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HdfsBlockLocation; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -715,6 +723,69 @@ public class TestCombineFileInputFormat out.close(); DFSTestUtil.waitReplication(fileSys, name, replication); } + + public void testNodeDistribution() throws IOException, InterruptedException { + DummyInputFormat inFormat = new DummyInputFormat(); + int numBlocks = 60; + long totLength = 0; + long blockSize = 100; + int numNodes = 10; + + long minSizeNode = 50; + long minSizeRack = 50; + int maxSplitSize = 200; // 4 blocks per split. + + String[] locations = new String[numNodes]; + for (int i = 0; i < numNodes; i++) { + locations[i] = "h" + i; + } + String[] racks = new String[0]; + Path path = new Path("hdfs://file"); + + OneBlockInfo[] blocks = new OneBlockInfo[numBlocks]; + + int hostCountBase = 0; + // Generate block list. Replication 3 per block. + for (int i = 0; i < numBlocks; i++) { + int localHostCount = hostCountBase; + String[] blockHosts = new String[3]; + for (int j = 0; j < 3; j++) { + int hostNum = localHostCount % numNodes; + blockHosts[j] = "h" + hostNum; + localHostCount++; + } + hostCountBase++; + blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts, + racks); + totLength += blockSize; + } + + List<InputSplit> splits = new ArrayList<InputSplit>(); + HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>(); + HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>(); + HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); + Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>(); + + OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, + nodeToBlocks, rackToNodes); + + inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, + maxSplitSize, minSizeNode, minSizeRack, splits); + + int expectedSplitCount = (int) (totLength / maxSplitSize); + Assert.assertEquals(expectedSplitCount, splits.size()); + + // Ensure 90+% of the splits have node local blocks. + // 100% locality may not always be achieved. + int numLocalSplits = 0; + for (InputSplit inputSplit : splits) { + Assert.assertEquals(maxSplitSize, inputSplit.getLength()); + if (inputSplit.getLocations().length == 1) { + numLocalSplits++; + } + } + Assert.assertTrue(numLocalSplits >= 0.9 * splits.size()); + } public void testNodeInputSplit() throws IOException, InterruptedException { // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on @@ -744,8 +815,8 @@ public class TestCombineFileInputFormat new HashMap<String, List<OneBlockInfo>>(); HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); - HashMap<String, List<OneBlockInfo>> nodeToBlocks = - new HashMap<String, List<OneBlockInfo>>(); + HashMap<String, Set<OneBlockInfo>> nodeToBlocks = + new HashMap<String, Set<OneBlockInfo>>(); OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);