Author: olga
Date: Thu Dec 10 15:48:16 2009
New Revision: 889305
URL: http://svn.apache.org/viewvc?rev=889305&view=rev
Log:
PIG-1135: skewed join partitioner returns negative partition index (yinghe via
olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec 10 15:48:16 2009
@@ -47,6 +47,9 @@
BUG FIXES
+PIG-1135: skewed join partitioner returns negative partition index (yinghe
+via olgan)
+
PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via
olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Thu Dec 10 15:48:16 2009
@@ -157,7 +157,7 @@
Byte tupleValIdx = 3;
Byte index = (Byte)tuple.get(0);
- Byte partitionIndex = -1;
+ Integer partitionIndex = -1;
// for partitioning table, the partition index isn't present
if (tuple.size() == 3) {
//super.collect(oc, tuple);
@@ -165,7 +165,7 @@
tupleKeyIdx--;
tupleValIdx--;
} else {
- partitionIndex = (Byte)tuple.get(1);
+ partitionIndex = (Integer)tuple.get(1);
}
PigNullableWritable key =
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
Thu Dec 10 15:48:16 2009
@@ -70,8 +70,8 @@
public int getPartition(PigNullableWritable wrappedKey, Writable value,
int numPartitions) {
// for streaming tables, return the partition index blindly
- if (wrappedKey instanceof NullablePartitionWritable &&
((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
- return (int)
((NullablePartitionWritable)wrappedKey).getPartition();
+ if (wrappedKey instanceof NullablePartitionWritable &&
(((NullablePartitionWritable)wrappedKey).getPartition()) != -1) {
+ return
((NullablePartitionWritable)wrappedKey).getPartition();
}
// for partition table, compute the index based on the sampler
output
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
Thu Dec 10 15:48:16 2009
@@ -223,7 +223,7 @@
Tuple opTuple = mTupleFactory.newTuple(4);
opTuple.set(0, t.get(0));
// set the partition index
- opTuple.set(1, reducerIdx.byteValue());
+ opTuple.set(1, reducerIdx.intValue());
opTuple.set(2, key);
opTuple.set(3, t.get(2));
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
Thu Dec 10 15:48:16 2009
@@ -26,7 +26,7 @@
* index to the class.
*/
public class NullablePartitionWritable extends PigNullableWritable{
- private byte partitionIndex;
+ private int partitionIndex;
private PigNullableWritable key;
public NullablePartitionWritable() {
@@ -45,11 +45,11 @@
return key;
}
- public void setPartition(byte n) {
+ public void setPartition(int n) {
partitionIndex = n;
}
- public byte getPartition() {
+ public int getPartition() {
return partitionIndex;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=889305&r1=889304&r2=889305&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Dec 10
15:48:16 2009
@@ -42,6 +42,8 @@
private static final String INPUT_FILE3 = "SkewedJoinInput3.txt";
private static final String INPUT_FILE4 = "SkewedJoinInput4.txt";
private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
+ private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
+ private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
@@ -114,12 +116,31 @@
w5.println("100\t");
w5.close();
+ PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_FILE6));
+
+ for(int i=0; i<300; i++) {
+ for(int j=0; j<5; j++) {
+ w6.println(""+i+"\t"+j);
+ }
+ }
+ w6.close();
+
+ PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_FILE7));
+
+ for(int i=0; i<300; i = i+3) {
+ for(int j=0; j<2; j++) {
+ w7.println(""+i+"\t"+j);
+ }
+ }
+ w7.close();
+
Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
-
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE6, INPUT_FILE6);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE7, INPUT_FILE7);
}
@After
@@ -128,6 +149,9 @@
new File(INPUT_FILE2).delete();
new File(INPUT_FILE3).delete();
new File(INPUT_FILE4).delete();
+ new File(INPUT_FILE5).delete();
+ new File(INPUT_FILE6).delete();
+ new File(INPUT_FILE7).delete();
Util.deleteDirectory(new File("skewedjoin"));
Util.deleteFile(cluster, INPUT_FILE1);
@@ -135,7 +159,8 @@
Util.deleteFile(cluster, INPUT_FILE3);
Util.deleteFile(cluster, INPUT_FILE4);
Util.deleteFile(cluster, INPUT_FILE5);
-
+ Util.deleteFile(cluster, INPUT_FILE6);
+ Util.deleteFile(cluster, INPUT_FILE7);
}
public void testSkewedJoinWithGroup() throws IOException{
@@ -392,4 +417,31 @@
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
}
+
+ public void testSkewedJoinManyReducers() throws IOException {
+
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
"2");
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE6 + "' as
(id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE7 + "' as
(id,name);");
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("E = join A by id, B by id using
\"skewed\" parallel 300;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("E = join A by id, B by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbrj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbrj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+
+ }
}