Author: olga Date: Wed Dec 9 16:39:18 2009 New Revision: 888868 URL: http://svn.apache.org/viewvc?rev=888868&view=rev Log: PIG-1135: skewed join partitioner returns negative partition index (yinghe via olgan)
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Dec 9 16:39:18 2009 @@ -135,8 +135,9 @@ BUG FIXES -PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via -olgan) +PIG-1135: skewed join partitioner returns negative partition index (yinghe via olgan) + +PIG-1134: Skewed Join sampling job overwhelms the name node (sriranjan via olgan) PIG-1105: COUNT_STAR accumulate interface implementation cases failure(sriranjan via olgan) Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Dec 9 16:39:18 2009 @@ -157,7 +157,7 @@ Byte tupleValIdx = 3; Byte index = (Byte)tuple.get(0); - Byte partitionIndex = -1; + Integer partitionIndex = -1; // for partitioning table, the partition index isn't present if (tuple.size() == 3) { //super.collect(oc, tuple); @@ -165,7 +165,7 @@ tupleKeyIdx--; tupleValIdx--; } else { - partitionIndex = (Byte)tuple.get(1); + partitionIndex = (Integer)tuple.get(1); } PigNullableWritable key = Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Wed Dec 9 16:39:18 2009 @@ -70,8 +70,8 @@ public int getPartition(PigNullableWritable wrappedKey, Writable value, int numPartitions) { // for streaming tables, return the partition index blindly - if (wrappedKey instanceof NullablePartitionWritable && ((int)((NullablePartitionWritable)wrappedKey).getPartition()) != -1) { - return (int) ((NullablePartitionWritable)wrappedKey).getPartition(); + if (wrappedKey instanceof NullablePartitionWritable && (((NullablePartitionWritable)wrappedKey).getPartition()) != -1) { + return ((NullablePartitionWritable)wrappedKey).getPartition(); } // for partition table, compute the index based on the sampler output Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Wed Dec 9 16:39:18 2009 @@ -223,7 +223,7 @@ Tuple opTuple = mTupleFactory.newTuple(4); opTuple.set(0, t.get(0)); // set the partition index - opTuple.set(1, reducerIdx.byteValue()); + opTuple.set(1, reducerIdx.intValue()); opTuple.set(2, key); opTuple.set(3, t.get(2)); Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/io/NullablePartitionWritable.java Wed Dec 9 16:39:18 2009 @@ -26,7 +26,7 @@ * index to the class. */ public class NullablePartitionWritable extends PigNullableWritable{ - private byte partitionIndex; + private int partitionIndex; private PigNullableWritable key; public NullablePartitionWritable() { @@ -45,11 +45,11 @@ return key; } - public void setPartition(byte n) { + public void setPartition(int n) { partitionIndex = n; } - public byte getPartition() { + public int getPartition() { return partitionIndex; } Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java?rev=888868&r1=888867&r2=888868&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSkewedJoin.java Wed Dec 9 16:39:18 2009 @@ -42,6 +42,8 @@ private static final String INPUT_FILE3 = "SkewedJoinInput3.txt"; private static final String INPUT_FILE4 = "SkewedJoinInput4.txt"; private static final String INPUT_FILE5 = "SkewedJoinInput5.txt"; + private static final String INPUT_FILE6 = "SkewedJoinInput6.txt"; + private static final String INPUT_FILE7 = "SkewedJoinInput7.txt"; private PigServer pigServer; private MiniCluster cluster = MiniCluster.buildCluster(); @@ -114,12 +116,31 @@ w5.println("100\t"); w5.close(); + PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_FILE6)); + + for(int i=0; i<300; i++) { + for(int j=0; j<5; j++) { + w6.println(""+i+"\t"+j); + } + } + w6.close(); + + PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_FILE7)); + + for(int i=0; i<300; i = i+3) { + for(int j=0; j<2; j++) { + w7.println(""+i+"\t"+j); + } + } + w7.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1); Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2); Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3); Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4); Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5); - + Util.copyFromLocalToCluster(cluster, INPUT_FILE6, INPUT_FILE6); + Util.copyFromLocalToCluster(cluster, INPUT_FILE7, INPUT_FILE7); } @After @@ -128,6 +149,9 @@ new File(INPUT_FILE2).delete(); new File(INPUT_FILE3).delete(); new File(INPUT_FILE4).delete(); + new File(INPUT_FILE5).delete(); + new File(INPUT_FILE6).delete(); + new File(INPUT_FILE7).delete(); Util.deleteDirectory(new File("skewedjoin")); Util.deleteFile(cluster, INPUT_FILE1); @@ -135,7 +159,8 @@ Util.deleteFile(cluster, INPUT_FILE3); Util.deleteFile(cluster, INPUT_FILE4); Util.deleteFile(cluster, INPUT_FILE5); - + Util.deleteFile(cluster, INPUT_FILE6); + Util.deleteFile(cluster, INPUT_FILE7); } public void testSkewedJoinWithGroup() throws IOException{ @@ -392,4 +417,31 @@ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj)); } + + public void testSkewedJoinManyReducers() throws IOException { + pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "2"); + pigServer.registerQuery("A = LOAD '" + INPUT_FILE6 + "' as (id,name);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE7 + "' as (id,name);"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("E = join A by id, B by id using \"skewed\" parallel 300;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("E = join A by id, B by id;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbrj.add(iter.next()); + } + } + Assert.assertEquals(dbfrj.size(), dbrj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj)); + + } }