Author: gates
Date: Fri Oct 30 22:49:22 2009
New Revision: 831481

URL: http://svn.apache.org/viewvc?rev=831481&view=rev
Log:
PIG-1048: inner join using 'skewed' produces multiple rows for keys with single 
row in both input relations.

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 22:49:22 2009
@@ -109,6 +109,9 @@
 
 BUG FIXES
 
+PIG-1048: inner join using 'skewed' produces multiple rows for keys with
+          single row in both input relations (sriranjan via gates)
+
 PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
 StoreFunc in the multistore case (pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Fri Oct 30 22:49:22 2009
@@ -147,13 +147,7 @@
                                }
                        }
                        // number of reducers
-                       Integer cnt = 0;
-                       if (minIndex < maxIndex) {
-                               cnt = maxIndex - minIndex;
-                       } else {
-                               cnt = totalReducers[0] + maxIndex - minIndex;
-                       }
-
+                       Integer cnt = maxIndex - minIndex;
                        reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is 
added to account for the 0 index
                }               
                return reducerMap;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
Fri Oct 30 22:49:22 2009
@@ -236,7 +236,7 @@
                // get the number of total tuples for this key
                long tupleCount = (long) (((double) count) / totalSampleCount_
                                * inputFileSize_ / avgD);       
-
+               tupleCount = Math.max(tupleCount, 1);
 
                int redCount = (int) Math.round(Math.ceil((double) tupleCount
                                / tupleMCount));
@@ -252,7 +252,7 @@
                }
 
                // this is not a skewed key
-               if (redCount == 1) {
+               if (redCount <= 1) {
                        return new Pair<Tuple, Integer>(null, 1);
                }
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Oct 30 
22:49:22 2009
@@ -138,7 +138,6 @@
 
     }
     
-    
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");
@@ -290,17 +289,18 @@
                lineCount[key][i] ++;
            }
          }
+         
+         int fc = 0;
          for(int i=0; i<3; i++) {
-                int fc = 0;
                 for(int j=0; j<7; j++) {
-                        if (lineCount[i][j] > 0) {
+                    if (lineCount[i][j] > 0) {
                                 fc ++;
                         }
                 }
-                // all three keys are skewed keys,
-                // check each key should appear in more than 1 part- file
-                assertTrue(fc > 1);
          }
+         // atleast one key should be a skewed key
+         // check atleast one key should appear in more than 1 part- file
+         assertTrue(fc > 3);
     }
     
     public void testSkewedJoinNullKeys() throws IOException {
@@ -324,4 +324,35 @@
         return;
     }
     
+    // pig 1048
+    public void testSkewedJoinOneValue() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as 
(id,name);");
+        // Filter key with a single value
+
+        pigServer.registerQuery("C = FILTER A by id == 400;");
+        pigServer.registerQuery("D = FILTER B by id == 400;");
+
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by id, D by id using 
\"skewed\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+               pigServer.registerQuery("E = join C by id, D by id;");
+               Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+               while(iter.hasNext()) {
+                       dbrj.add(iter.next());
+               }
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
+       
+    }
 }


Reply via email to