Author: rding
Date: Thu Mar  4 21:35:13 2010
New Revision: 919189

URL: http://svn.apache.org/viewvc?rev=919189&view=rev
Log:
PIG-1273: Skewed join throws error

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Mar  4 21:35:13 2010
@@ -139,6 +139,8 @@
 
 BUG FIXES
 
+PIG-1273: Skewed join throws error (rding)
+
 PIG-1267: Problems with partition filter optimizer (rding)
 
 PIG-1079: Modify merge join to use distributed cache to maintain the index

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Mar  4 21:35:13 2010
@@ -1854,7 +1854,7 @@
                String inputFile = lFile.getFileName();
 
                return getSamplingJob(sort, prevJob, transformPlans, lFile, 
sampleFile, rp, null, 
-                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, 
RandomSampleLoader.class.getName());
+                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, 
PoissonSampleLoader.class.getName());
        }catch(Exception e) {
                throw new PlanException(e);
        }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Thu Mar  4 21:35:13 2010
@@ -31,6 +31,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
@@ -72,8 +73,8 @@
         // The keydist file is structured as (key, min, max)
         // min, max being the index of the reducers
         Map<String, Object > distMap = (Map<String, Object>) t.get (0);
-        partitionList = (DataBag) distMap.get("partition.list");
-        totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
+        partitionList = (DataBag) 
distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+        totalReducers[0] = 
Integer.valueOf(""+distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
         Iterator<Tuple> it = partitionList.iterator();
         while (it.hasNext()) {
             Tuple idxTuple = it.next();

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java 
Thu Mar  4 21:35:13 2010
@@ -248,7 +248,7 @@
            }
 
            // this is not a skewed key
-           if (redCount == 1) {
+           if (redCount <= 1) {
                return new Pair<Tuple, Integer>(null, 1);
            }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
Thu Mar  4 21:35:13 2010
@@ -153,15 +153,18 @@
      * @throws ExecException
      */
     private Tuple createNumRowTuple(Tuple sample) throws ExecException {
-        if(rowNum == 0 || sample == null)
-            return null;
-        TupleFactory factory = TupleFactory.getInstance();
-        Tuple t = factory.newTuple(sample.size() + 2);
-        for(int i=0; i<sample.size(); i++){
-            t.set(i, sample.get(i));
+        int sz = (sample == null) ? 0 : sample.size();
+        TupleFactory factory = TupleFactory.getInstance();       
+        Tuple t = factory.newTuple(sz + 2);
+ 
+        if (sample != null) {
+            for(int i=0; i<sample.size(); i++){
+                t.set(i, sample.get(i));
+            }
         }
-        t.set(sample.size(), NUMROWS_TUPLE_MARKER);
-        t.set(sample.size() + 1, rowNum);
+        
+        t.set(sz, NUMROWS_TUPLE_MARKER);
+        t.set(sz + 1, rowNum);
         numRowSplTupleReturned = true;
         return t;
     }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Mar  4 
21:35:13 2010
@@ -444,4 +444,41 @@
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
        
     }
+    
+    public void testSkewedJoinEmptyInput() throws IOException {
+        String LEFT_INPUT_FILE = "left.dat";
+        String RIGHT_INPUT_FILE = "right.dat";
+        
+        PrintWriter w = new PrintWriter(new FileWriter(LEFT_INPUT_FILE));
+        w.println("1");
+        w.println("2");
+        w.println("3");
+        w.println("5");
+        w.close();
+        
+        Util.copyFromLocalToCluster(cluster, LEFT_INPUT_FILE, LEFT_INPUT_FILE);
+        
+        PrintWriter w2 = new PrintWriter(new FileWriter(RIGHT_INPUT_FILE));
+        w2.println("1\tone");
+        w2.println("2\ttwo");
+        w2.println("3\tthree");
+
+        w2.close();
+
+        Util.copyFromLocalToCluster(cluster, RIGHT_INPUT_FILE, 
RIGHT_INPUT_FILE);
+        
+        pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
+        pigServer.registerQuery("b = load 'right.dat' as 
(number:chararray,text:chararray);");
+        pigServer.registerQuery("c = filter a by nums == '7';");
+        pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number 
USING 'skewed';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        
+        Assert.assertFalse(iter.hasNext());
+        
+        new File(LEFT_INPUT_FILE).delete();
+        Util.deleteFile(cluster, LEFT_INPUT_FILE);
+        new File(RIGHT_INPUT_FILE).delete();
+        Util.deleteFile(cluster, RIGHT_INPUT_FILE);            
+    }
 }


Reply via email to