Author: olga
Date: Mon Sep 14 17:59:53 2009
New Revision: 814754
URL: http://svn.apache.org/viewvc?rev=814754&view=rev
Log:
PIG-955: Skewed join produces invalid results (yinghe via olgan)
Modified:
hadoop/pig/branches/branch-0.4/CHANGES.txt
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/branches/branch-0.4/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/CHANGES.txt?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.4/CHANGES.txt Mon Sep 14 17:59:53 2009
@@ -73,6 +73,8 @@
BUG FIXES
+ PIG-955: Skewed join produces invalid results (yinghe via olgan)
+
PIG-954: Skewed join fails when pig.skewedjoin.reduce.memusage is not
configured(yinghe via olgan)
Modified:
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
(original)
+++
hadoop/pig/branches/branch-0.4/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
Mon Sep 14 17:59:53 2009
@@ -87,7 +87,7 @@
} else {
keyTuple =
DefaultTupleFactory.getInstance().newTuple(1);
try {
- keyTuple.set(0, key);
+ keyTuple.set(0, key.getValueAsPigType());
} catch (ExecException e) {
return -1;
}
Modified:
hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
(original)
+++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/TestSkewedJoin.java
Mon Sep 14 17:59:53 2009
@@ -112,6 +112,7 @@
new File(INPUT_FILE2).delete();
new File(INPUT_FILE3).delete();
new File(INPUT_FILE4).delete();
+ Util.deleteDirectory(new File("skewedjoin"));
Util.deleteFile(cluster, INPUT_FILE1);
Util.deleteFile(cluster, INPUT_FILE2);
@@ -241,4 +242,48 @@
return;
}
+
+
+ public void testSkewedJoinKeyPartition() throws IOException {
+ try{
+ Util.deleteFile(cluster, "skewedjoin");
+ }catch(Exception e){
+ // it is ok if directory not exist
+ }
+
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name,
n);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id,
name);");
+
+
+ pigServer.registerQuery("E = join A by id, B by id using \"skewed\"
parallel 7;");
+ pigServer.store("E", "skewedjoin");
+
+ int[][] lineCount = new int[3][7];
+
+ new File("skewedjoin").mkdir();
+ // check how many times a key appear in each part- file
+ for(int i=0; i<7; i++) {
+ Util.copyFromClusterToLocal(cluster, "skewedjoin/part-0000"+i,
"skewedjoin/part-0000"+i);
+
+ BufferedReader reader = new BufferedReader(new
FileReader("skewedjoin/part-0000"+i));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ String[] cols = line.split("\t");
+ int key = Integer.parseInt(cols[0])/100 -1;
+ lineCount[key][i] ++;
+ }
+ }
+ for(int i=0; i<3; i++) {
+ int fc = 0;
+ for(int j=0; j<7; j++) {
+ if (lineCount[i][j] > 0) {
+ fc ++;
+ }
+ }
+ // all three keys are skewed keys,
+ // check each key should appear in more than 1 part- file
+ assertTrue(fc > 1);
+ }
+ }
+
}
Modified: hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java?rev=814754&r1=814753&r2=814754&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.4/test/org/apache/pig/test/Util.java Mon Sep
14 17:59:53 2009
@@ -20,11 +20,14 @@
import static java.util.regex.Matcher.quoteReplacement;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@@ -36,6 +39,7 @@
import junit.framework.Assert;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -306,6 +310,26 @@
Util.createInputFile(cluster, fileNameOnCluster,
contents.toArray(new String[0]));
}
+ static public void copyFromClusterToLocal(MiniCluster cluster, String
fileNameOnCluster, String localFileName) throws IOException {
+ PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
+
+ FileSystem fs = cluster.getFileSystem();
+ if(!fs.exists(new Path(fileNameOnCluster))) {
+ throw new IOException("File " + fileNameOnCluster + " does not
exists on the minicluster");
+ }
+
+ String line = null;
+
+ FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(stream));
+ while( (line = reader.readLine()) != null) {
+ writer.println(line);
+ }
+
+ reader.close();
+ writer.close();
+ }
+
static public void printQueryOutput(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {