Author: yanz
Date: Tue Sep 28 19:27:02 2010
New Revision: 1002333

URL: http://svn.apache.org/viewvc?rev=1002333&view=rev
Log:
PIG-1648: Split combination may return too many block locations to map/reduce 
framework (yanz)

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1002333&r1=1002332&r2=1002333&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Tue Sep 28 19:27:02 2010
@@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1648: Split combination may return too many block locations to map/reduce 
framework (yanz)
+
 PIG-1641: Incorrect counters in local mode (rding)
 
 PIG-1647: Logical simplifier throws a NPE (yanz)

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1002333&r1=1002332&r2=1002333&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Tue Sep 28 19:27:02 2010
@@ -28,8 +28,14 @@ import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
 import java.lang.StringBuilder;
 
 import org.apache.hadoop.conf.Configurable;
@@ -136,19 +142,41 @@ public class PigSplit extends InputSplit
     }
     
     @Override
+    @SuppressWarnings("unchecked")
     public String[] getLocations() throws IOException, InterruptedException {
         if (locations == null) {
-            HashSet<String> locSet = new HashSet<String>();
-            for (int i = 0; i < wrappedSplits.length; i++)
+            HashMap<String, Long> locMap = new HashMap<String, Long>();
+            Long lenInMap;
+            for (InputSplit split : wrappedSplits)
             {
-                String[] locs = wrappedSplits[i].getLocations();
-                for (int j = 0; j < locs.length; j++)
-                    locSet.add(locs[j]);
+                String[] locs = split.getLocations();
+                for (String loc : locs)
+                {
+                    if ((lenInMap = locMap.get(loc)) == null)
+                        locMap.put(loc, split.getLength());
+                    else
+                        locMap.put(loc, lenInMap + split.getLength());
+                }
+            }
+            Set<Map.Entry<String, Long>> entrySet = locMap.entrySet();
+            Map.Entry<String, Long>[] hostSize =
+                entrySet.toArray(new Map.Entry[entrySet.size()]);
+            Arrays.sort(hostSize, new Comparator<Map.Entry<String, Long>>() {
+
+              @Override
+              public int compare(Entry<String, Long> o1, Entry<String, Long> 
o2) {
+                long diff = o1.getValue() - o2.getValue();
+                if (diff < 0) return 1;
+                if (diff > 0) return -1;
+                return 0;
+              }
+            });
+            // maximum 5 locations are in list: refer to PIG-1648 for more 
details
+            int nHost = Math.min(hostSize.length, 5);
+            locations = new String[nHost];
+            for (int i = 0; i < nHost; ++i) {
+              locations[i] = hostSize[i].getKey();
             }
-            locations = new String[locSet.size()];
-            int i = 0;
-            for (String loc : locSet)
-                locations[i++] = loc;
         }
         return locations;
     }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java?rev=1002333&r1=1002332&r2=1002333&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestSplitCombine.java 
Tue Sep 28 19:27:02 2010
@@ -427,6 +427,38 @@ public class TestSplitCombine {
         }
     }
 
+    @Test
+    public void test9() throws IOException, InterruptedException {
+        // verify locations in order
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l3", "l4", "l5"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l5", "l6", "l1"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(result.size(), 1);
+        int index = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            Assert.assertEquals(3, len);
+            // only 5 locations are in list: refer to PIG-1648 for more details
+            checkLocationOrdering(pigSplit.getLocations(), new String[] {
+                            "l5", "l1", "l6", "l3", "l4"
+            });
+            Assert.assertEquals(400, pigSplit.getLength(0));
+            Assert.assertEquals(200, pigSplit.getLength(1));
+            Assert.assertEquals(100, pigSplit.getLength(2));
+            index++;
+        }
+    }
+    
     private void checkLocations(String[] actual, String[] expected) {
         HashSet<String> expectedSet = new HashSet<String>();
         for (String str : expected)
@@ -438,4 +470,9 @@ public class TestSplitCombine {
         Assert.assertEquals(count, expected.length);
     }
 
+    private void checkLocationOrdering(String[] actual, String[] expected) {
+      Assert.assertEquals(expected.length, actual.length);
+      for (int i = 0; i < actual.length; i++)
+        Assert.assertEquals(expected[i], actual[i]);
+    }
 }


Reply via email to