Author: olga Date: Mon Sep 14 17:59:53 2009 New Revision: 814754 URL: http://svn.apache.org/viewvc?rev=814754&view=rev Log: PIG-955: Skewed join produces invalid results (yinghe via olgan)
Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=814754&r1=814753&r2=814754&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.4/CHANGES.txt Mon Sep 14 17:59:53 2009 @@ -73,6 +73,8 @@ BUG FIXES + PIG-955: Skewed join produces invalid results (yinghe via olgan) + PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not configured(yinghe via olgan) Modified: hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=814754&r1=814753&r2=814754&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon Sep 14 17:59:53 2009 @@ -87,7 +87,7 @@ } else { keyTuple = DefaultTupleFactory.getInstance().newTuple(1); try { - keyTuple.set(0, key); + keyTuple.set(0, key.getValueAsPigType()); } catch (ExecException e) { return -1; } Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=814754&r1=814753&r2=814754&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java Mon Sep 14 17:59:53 2009 @@ -112,6 +112,7 @@ new File(INPUT_FILE2).delete(); new File(INPUT_FILE3).delete(); new File(INPUT_FILE4).delete(); + Util.deleteDirectory(new File("skewedjoin")); Util.deleteFile(cluster, INPUT_FILE1); Util.deleteFile(cluster, INPUT_FILE2); @@ -241,4 +242,48 @@ return; } + + + public void testSkewedJoinKeyPartition() throws IOException { + try{ + Util.deleteFile(cluster, "skewedjoin"); + }catch(Exception e){ + // it is ok if directory not exist + } + + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);"); + + + pigServer.registerQuery("E = join A by id, B by id using \"skewed\" parallel 7;"); + pigServer.store("E", "skewedjoin"); + + int[][] lineCount = new int[3][7]; + + new File("skewedjoin").mkdir(); + // check how many times a key appear in each part- file + for(int i=0; i<7; i++) { + Util.copyFromClusterToLocal(cluster, "skewedjoin/part-0000"+i, "skewedjoin/part-0000"+i); + + BufferedReader reader = new BufferedReader(new FileReader("skewedjoin/part-0000"+i)); + String line = null; + while((line = reader.readLine()) != null) { + String[] cols = line.split("\t"); + int key = Integer.parseInt(cols[0])/100 -1; + lineCount[key][i] ++; + } + } + for(int i=0; i<3; i++) { + int fc = 0; + for(int j=0; j<7; j++) { + if (lineCount[i][j] > 0) { + fc ++; + } + } + // all three keys are skewed keys, + // check each key should appear in more than 1 part- file + assertTrue(fc > 1); + } + } + } Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java?rev=814754&r1=814753&r2=814754&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java Mon Sep 14 17:59:53 2009 @@ -20,11 +20,14 @@ import static java.util.regex.Matcher.quoteReplacement; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -36,6 +39,7 @@ import junit.framework.Assert; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -306,6 +310,26 @@ Util.createInputFile(cluster, fileNameOnCluster, contents.toArray(new String[0])); } + static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException { + PrintWriter writer = new PrintWriter(new FileWriter(localFileName)); + + FileSystem fs = cluster.getFileSystem(); + if(!fs.exists(new Path(fileNameOnCluster))) { + throw new IOException("File " + fileNameOnCluster + " does not exists on the minicluster"); + } + + String line = null; + + FSDataInputStream stream = fs.open(new Path(fileNameOnCluster)); + BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); + while( (line = reader.readLine()) != null) { + writer.println(line); + } + + reader.close(); + writer.close(); + } + static public void printQueryOutput(Iterator<Tuple> actualResults, Tuple[] expectedResults) {