Author: olga
Date: Mon Sep 14 17:59:53 2009
New Revision: 814754

URL: http://svn.apache.org/viewvc?rev=814754&view=rev
Log:
PIG-955: Skewed join produces invalid results  (yinghe via olgan)

Modified:
    hadoop/pig/branches/branch-0.4/CHANGES.txt
    
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
    hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.4/CHANGES.txt Mon Sep 14 17:59:53 2009
@@ -73,6 +73,8 @@
 
 BUG FIXES
 
+    PIG-955: Skewed join produces invalid results  (yinghe via olgan)
+
     PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
     configured(yinghe via olgan)
 

Modified: 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Mon Sep 14 17:59:53 2009
@@ -87,7 +87,7 @@
                } else {
                        keyTuple = 
DefaultTupleFactory.getInstance().newTuple(1);
                        try {
-                               keyTuple.set(0, key);
+                               keyTuple.set(0, key.getValueAsPigType());
                        } catch (ExecException e) {
                                return -1;
                        }

Modified: 
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java 
(original)
+++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java 
Mon Sep 14 17:59:53 2009
@@ -112,6 +112,7 @@
        new File(INPUT_FILE2).delete();
        new File(INPUT_FILE3).delete();
         new File(INPUT_FILE4).delete();
+        Util.deleteDirectory(new File("skewedjoin"));
        
         Util.deleteFile(cluster, INPUT_FILE1);
         Util.deleteFile(cluster, INPUT_FILE2);
@@ -241,4 +242,48 @@
         
                return;
        }
+
+
+    public void testSkewedJoinKeyPartition() throws IOException {
+       try{
+            Util.deleteFile(cluster, "skewedjoin");
+       }catch(Exception e){
+               // it is ok if directory not exist
+       }
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
+         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");
+           
+        
+         pigServer.registerQuery("E = join A by id, B by id using \"skewed\" 
parallel 7;");
+         pigServer.store("E", "skewedjoin");
+         
+         int[][] lineCount = new int[3][7];
+         
+         new File("skewedjoin").mkdir();
+         // check how many times a key appear in each part- file
+         for(int i=0; i<7; i++) {
+                Util.copyFromClusterToLocal(cluster, "skewedjoin/part-0000"+i, 
"skewedjoin/part-0000"+i);
+                
+                BufferedReader reader = new BufferedReader(new 
FileReader("skewedjoin/part-0000"+i));
+            String line = null;             
+            while((line = reader.readLine()) != null) {
+               String[] cols = line.split("\t");
+               int key = Integer.parseInt(cols[0])/100 -1;
+               lineCount[key][i] ++;
+           }
+         }
+         for(int i=0; i<3; i++) {
+                int fc = 0;
+                for(int j=0; j<7; j++) {
+                        if (lineCount[i][j] > 0) {
+                                fc ++;
+                        }
+                }
+                // all three keys are skewed keys,
+                // check each key should appear in more than 1 part- file
+                assertTrue(fc > 1);
+         }
+    }
+    
 }

Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java Mon Sep 
14 17:59:53 2009
@@ -20,11 +20,14 @@
 import static java.util.regex.Matcher.quoteReplacement;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
@@ -36,6 +39,7 @@
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -306,6 +310,26 @@
            Util.createInputFile(cluster, fileNameOnCluster, 
contents.toArray(new String[0]));
        }
        
+       static public void copyFromClusterToLocal(MiniCluster cluster, String 
fileNameOnCluster, String localFileName) throws IOException {
+           PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
+           
+           FileSystem fs = cluster.getFileSystem();
+        if(!fs.exists(new Path(fileNameOnCluster))) {
+            throw new IOException("File " + fileNameOnCluster + " does not 
exists on the minicluster");
+        }
+        
+        String line = null;
+          
+        FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(stream));
+        while( (line = reader.readLine()) != null) {
+               writer.println(line);           
+        }
+    
+        reader.close();
+        writer.close();
+       }
+       
        static public void printQueryOutput(Iterator<Tuple> actualResults, 
                Tuple[] expectedResults) {
 


Reply via email to