Author: olga
Date: Wed Dec  9 16:39:18 2009
New Revision: 888868

URL: http://svn.apache.org/viewvc?rev=888868&view=rev
Log:
PIG-1135: skewed join partitioner returns negative partition index  (yinghe via 
olgan)

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java
    hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Dec  9 16:39:18 2009
@@ -135,8 +135,9 @@
 
 BUG FIXES
 
-PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via
-olgan)
+PIG-1135: skewed join partitioner returns negative partition index  (yinghe 
via olgan)
+
+PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via 
olgan)
 
 PIG-1105: COUNT_STAR accumulate interface implementation cases 
failure(sriranjan via olgan)
 

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Wed Dec  9 16:39:18 2009
@@ -157,7 +157,7 @@
                        Byte tupleValIdx = 3;
 
             Byte index = (Byte)tuple.get(0);
-                       Byte partitionIndex = -1;
+                       Integer partitionIndex = -1;
                // for partitioning table, the partition index isn't present
                        if (tuple.size() == 3) {
                                //super.collect(oc, tuple);
@@ -165,7 +165,7 @@
                                tupleKeyIdx--;
                                tupleValIdx--;
                        } else {
-                               partitionIndex = (Byte)tuple.get(1);
+                               partitionIndex = (Integer)tuple.get(1);
                        }
 
             PigNullableWritable key =

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Wed Dec  9 16:39:18 2009
@@ -70,8 +70,8 @@
     public int getPartition(PigNullableWritable wrappedKey, Writable value,
             int numPartitions) {
                // for streaming tables, return the partition index blindly
-               if (wrappedKey instanceof NullablePartitionWritable && 
((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
-                       return (int) 
((NullablePartitionWritable)wrappedKey).getPartition();
+               if (wrappedKey instanceof NullablePartitionWritable && 
(((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+                       return 
((NullablePartitionWritable)wrappedKey).getPartition();
                }
 
                // for partition table, compute the index based on the sampler 
output

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 Wed Dec  9 16:39:18 2009
@@ -223,7 +223,7 @@
                        Tuple opTuple = mTupleFactory.newTuple(4);
                        opTuple.set(0, t.get(0));
                        // set the partition index
-                       opTuple.set(1, reducerIdx.byteValue());
+                       opTuple.set(1, reducerIdx.intValue());
                        opTuple.set(2, key);
                        opTuple.set(3, t.get(2));
                        

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java
 Wed Dec  9 16:39:18 2009
@@ -26,7 +26,7 @@
  * index to the class.
  */
 public class NullablePartitionWritable extends PigNullableWritable{
-       private byte partitionIndex;
+       private int partitionIndex;
        private PigNullableWritable key;
 
        public NullablePartitionWritable() {
@@ -45,11 +45,11 @@
                return key;
        }
 
-       public void setPartition(byte n) {
+       public void setPartition(int n) {
                partitionIndex = n;
        }
 
-       public byte getPartition() {
+       public int getPartition() {
                return partitionIndex;
        }
 

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java?rev=888868&r1=888867&r2=888868&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java 
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java 
Wed Dec  9 16:39:18 2009
@@ -42,6 +42,8 @@
     private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
     private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
     private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
+    private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
+    private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
     
     private PigServer pigServer;
     private MiniCluster cluster = MiniCluster.buildCluster();
@@ -114,12 +116,31 @@
         w5.println("100\t");
         w5.close();
         
+        PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_FILE6));
+        
+        for(int i=0; i<300; i++) {
+            for(int j=0; j<5; j++) {
+                w6.println(""+i+"\t"+j);       
+            }           
+        }
+        w6.close();   
+        
+        PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_FILE7));
+        
+        for(int i=0; i<300; i = i+3) {
+            for(int j=0; j<2; j++) {
+                w7.println(""+i+"\t"+j);       
+            }           
+        }
+        w7.close();   
+
        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
        Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
-
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE6, INPUT_FILE6);      
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE7, INPUT_FILE7);
     }
     
     @After
@@ -128,6 +149,9 @@
        new File(INPUT_FILE2).delete();
        new File(INPUT_FILE3).delete();
         new File(INPUT_FILE4).delete();
+        new File(INPUT_FILE5).delete();
+        new File(INPUT_FILE6).delete();
+        new File(INPUT_FILE7).delete();
         Util.deleteDirectory(new File("skewedjoin"));
        
         Util.deleteFile(cluster, INPUT_FILE1);
@@ -135,7 +159,8 @@
         Util.deleteFile(cluster, INPUT_FILE3);
         Util.deleteFile(cluster, INPUT_FILE4);
         Util.deleteFile(cluster, INPUT_FILE5);
-
+        Util.deleteFile(cluster, INPUT_FILE6);
+        Util.deleteFile(cluster, INPUT_FILE7);
     }
     
     public void testSkewedJoinWithGroup() throws IOException{
@@ -392,4 +417,31 @@
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
        
     }
+
+    public void testSkewedJoinManyReducers() throws IOException {
+        
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
 "2");
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE6 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE7 + "' as 
(id,name);");
+           
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join A by id, B by id using 
\"skewed\" parallel 300;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("E = join A by id, B by id;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+            while(iter.hasNext()) {
+                dbrj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
+       
+    }
 }


Reply via email to