Author: rding
Date: Thu Mar 4 21:35:13 2010
New Revision: 919189
URL: http://svn.apache.org/viewvc?rev=919189&view=rev
Log:
PIG-1273: Skewed join throws error
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Mar 4 21:35:13 2010
@@ -139,6 +139,8 @@
BUG FIXES
+PIG-1273: Skewed join throws error (rding)
+
PIG-1267: Problems with partition filter optimizer (rding)
PIG-1079: Modify merge join to use distributed cache to maintain the index
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Mar 4 21:35:13 2010
@@ -1854,7 +1854,7 @@
String inputFile = lFile.getFileName();
return getSamplingJob(sort, prevJob, transformPlans, lFile,
sampleFile, rp, null,
-
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile},
RandomSampleLoader.class.getName());
+
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile},
PoissonSampleLoader.class.getName());
}catch(Exception e) {
throw new PlanException(e);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Thu Mar 4 21:35:13 2010
@@ -31,6 +31,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
@@ -72,8 +73,8 @@
// The keydist file is structured as (key, min, max)
// min, max being the index of the reducers
Map<String, Object > distMap = (Map<String, Object>) t.get (0);
- partitionList = (DataBag) distMap.get("partition.list");
- totalReducers[0] = Integer.valueOf(""+distMap.get("totalreducers"));
+ partitionList = (DataBag)
distMap.get(PartitionSkewedKeys.PARTITION_LIST);
+ totalReducers[0] =
Integer.valueOf(""+distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
Iterator<Tuple> it = partitionList.iterator();
while (it.hasNext()) {
Tuple idxTuple = it.next();
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
Thu Mar 4 21:35:13 2010
@@ -248,7 +248,7 @@
}
// this is not a skewed key
- if (redCount == 1) {
+ if (redCount <= 1) {
return new Pair<Tuple, Integer>(null, 1);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
Thu Mar 4 21:35:13 2010
@@ -153,15 +153,18 @@
* @throws ExecException
*/
private Tuple createNumRowTuple(Tuple sample) throws ExecException {
- if(rowNum == 0 || sample == null)
- return null;
- TupleFactory factory = TupleFactory.getInstance();
- Tuple t = factory.newTuple(sample.size() + 2);
- for(int i=0; i<sample.size(); i++){
- t.set(i, sample.get(i));
+ int sz = (sample == null) ? 0 : sample.size();
+ TupleFactory factory = TupleFactory.getInstance();
+ Tuple t = factory.newTuple(sz + 2);
+
+ if (sample != null) {
+ for(int i=0; i<sample.size(); i++){
+ t.set(i, sample.get(i));
+ }
}
- t.set(sample.size(), NUMROWS_TUPLE_MARKER);
- t.set(sample.size() + 1, rowNum);
+
+ t.set(sz, NUMROWS_TUPLE_MARKER);
+ t.set(sz + 1, rowNum);
numRowSplTupleReturned = true;
return t;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=919189&r1=919188&r2=919189&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Mar 4
21:35:13 2010
@@ -444,4 +444,41 @@
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
}
+
+ public void testSkewedJoinEmptyInput() throws IOException {
+ String LEFT_INPUT_FILE = "left.dat";
+ String RIGHT_INPUT_FILE = "right.dat";
+
+ PrintWriter w = new PrintWriter(new FileWriter(LEFT_INPUT_FILE));
+ w.println("1");
+ w.println("2");
+ w.println("3");
+ w.println("5");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, LEFT_INPUT_FILE, LEFT_INPUT_FILE);
+
+ PrintWriter w2 = new PrintWriter(new FileWriter(RIGHT_INPUT_FILE));
+ w2.println("1\tone");
+ w2.println("2\ttwo");
+ w2.println("3\tthree");
+
+ w2.close();
+
+ Util.copyFromLocalToCluster(cluster, RIGHT_INPUT_FILE,
RIGHT_INPUT_FILE);
+
+ pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
+ pigServer.registerQuery("b = load 'right.dat' as
(number:chararray,text:chararray);");
+ pigServer.registerQuery("c = filter a by nums == '7';");
+ pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number
USING 'skewed';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("d");
+
+ Assert.assertFalse(iter.hasNext());
+
+ new File(LEFT_INPUT_FILE).delete();
+ Util.deleteFile(cluster, LEFT_INPUT_FILE);
+ new File(RIGHT_INPUT_FILE).delete();
+ Util.deleteFile(cluster, RIGHT_INPUT_FILE);
+ }
}