Author: yanz Date: Tue Sep 28 19:27:02 2010 New Revision: 1002333 URL: http://svn.apache.org/viewvc?rev=1002333&view=rev Log: PIG-1648: Split combination may return too many block locations to map/reduce framework (yanz)
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1002333&r1=1002332&r2=1002333&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Tue Sep 28 19:27:02 2010 @@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1648: Split combination may return too many block locations to map/reduce framework (yanz) + PIG-1641: Incorrect counters in local mode (rding) PIG-1647: Logical simplifier throws a NPE (yanz) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1002333&r1=1002332&r2=1002333&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Tue Sep 28 19:27:02 2010 @@ -28,8 +28,14 @@ import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; import java.lang.StringBuilder; import org.apache.hadoop.conf.Configurable; @@ -136,19 +142,41 @@ public class PigSplit extends InputSplit } @Override + @SuppressWarnings("unchecked") public String[] getLocations() throws IOException, InterruptedException { if (locations == null) { - HashSet<String> locSet = new HashSet<String>(); - for (int i = 0; i < wrappedSplits.length; i++) + HashMap<String, Long> locMap = new HashMap<String, Long>(); + Long lenInMap; + for (InputSplit split : wrappedSplits) { - String[] locs = wrappedSplits[i].getLocations(); - for (int j = 0; j < locs.length; j++) - locSet.add(locs[j]); + String[] locs = split.getLocations(); + for (String loc : locs) + { + if ((lenInMap = locMap.get(loc)) == null) + locMap.put(loc, split.getLength()); + else + locMap.put(loc, lenInMap + split.getLength()); + } + } + Set<Map.Entry<String, Long>> entrySet = locMap.entrySet(); + Map.Entry<String, Long>[] hostSize = + entrySet.toArray(new Map.Entry[entrySet.size()]); + Arrays.sort(hostSize, new Comparator<Map.Entry<String, Long>>() { + + @Override + public int compare(Entry<String, Long> o1, Entry<String, Long> o2) { + long diff = o1.getValue() - o2.getValue(); + if (diff < 0) return 1; + if (diff > 0) return -1; + return 0; + } + }); + // maximum 5 locations are in list: refer to PIG-1648 for more details + int nHost = Math.min(hostSize.length, 5); + locations = new String[nHost]; + for (int i = 0; i < nHost; ++i) { + locations[i] = hostSize[i].getKey(); } - locations = new String[locSet.size()]; - int i = 0; - for (String loc : locSet) - locations[i++] = loc; } return locations; } Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java?rev=1002333&r1=1002332&r2=1002333&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java Tue Sep 28 19:27:02 2010 @@ -427,6 +427,38 @@ public class TestSplitCombine { } } + @Test + public void test9() throws IOException, InterruptedException { + // verify locations in order + ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>(); + rawSplits.add(new DummyInputSplit(100, new String[] { + "l1", "l2", "l3" + })); + rawSplits.add(new DummyInputSplit(200, new String[] { + "l3", "l4", "l5" + })); + rawSplits.add(new DummyInputSplit(400, new String[] { + "l5", "l6", "l1" + })); + List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok, + null, true, conf); + Assert.assertEquals(result.size(), 1); + int index = 0; + for (InputSplit split : result) { + PigSplit pigSplit = (PigSplit) split; + int len = pigSplit.getNumPaths(); + Assert.assertEquals(3, len); + // only 5 locations are in list: refer to PIG-1648 for more details + checkLocationOrdering(pigSplit.getLocations(), new String[] { + "l5", "l1", "l6", "l3", "l4" + }); + Assert.assertEquals(400, pigSplit.getLength(0)); + Assert.assertEquals(200, pigSplit.getLength(1)); + Assert.assertEquals(100, pigSplit.getLength(2)); + index++; + } + } + private void checkLocations(String[] actual, String[] expected) { HashSet<String> expectedSet = new HashSet<String>(); for (String str : expected) @@ -438,4 +470,9 @@ public class TestSplitCombine { Assert.assertEquals(count, expected.length); } + private void checkLocationOrdering(String[] actual, String[] expected) { + Assert.assertEquals(expected.length, actual.length); + for (int i = 0; i < actual.length; i++) + Assert.assertEquals(expected[i], actual[i]); + } }