Author: daijy
Date: Fri Dec 25 00:16:08 2009
New Revision: 893825
URL: http://svn.apache.org/viewvc?rev=893825&view=rev
Log:
PIG-761: ERROR 2086 on simple JOIN
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Dec 25 00:16:08 2009
@@ -319,6 +319,8 @@
PIG-1165: Signature of loader does not set correctly for order by (daijy)
+PIG-761: ERROR 2086 on simple JOIN (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Dec 25 00:16:08 2009
@@ -2418,6 +2418,7 @@
throw new MRCompilerException(msg, errCode,
PigException.BUG);
}
FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+ boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
FileSpec fSpec = getTempFileSpec();
((POStore)mpLeaf).setSFile(fSpec);
@@ -2439,9 +2440,10 @@
limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
POStore st = getStore();
st.setSFile(oldSpec);
- st.setIsTmpStore(false);
+ st.setIsTmpStore(oldIsTmpStore);
limitAdjustMROp.reducePlan.addAsLeaf(st);
limitAdjustMROp.requestedParallelism = 1;
+ limitAdjustMROp.setLimitOnly(true);
// If the operator we're following has global sort set, we
// need to indicate that this is a limit after a sort.
// This will assure that we get the right sort comparator
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Fri Dec 25 00:16:08 2009
@@ -83,6 +83,10 @@
// Indicates if this is a limit after a sort
boolean limitAfterSort = false;
+
+ // Indicate if the entire purpose for this map reduce job is doing limit,
does not change
+ // anything else. This is to help POPackageAnnotator to find the right
POPackage to annotate
+ boolean limitOnly = false;
// If true, putting an identity combine in this
// mapreduce job will speed things up.
@@ -284,6 +288,14 @@
public void setLimitAfterSort(boolean las) {
limitAfterSort = las;
}
+
+ public boolean isLimitOnly() {
+ return limitOnly;
+ }
+
+ public void setLimitOnly(boolean limitOnly) {
+ this.limitOnly = limitOnly;
+ }
public boolean needsDistinctCombiner() {
return needsDistinctCombiner;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
Fri Dec 25 00:16:08 2009
@@ -105,6 +105,8 @@
List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();)
{
MapReduceOper mrOper = it.next();
+ if (mrOper.isLimitOnly())
+ mrOper = this.mPlan.getPredecessors(mrOper).get(0);
lrFound += patchPackage(mrOper.reducePlan, pkg);
if(lrFound == pkg.getNumInps()) {
break;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Dec 25
00:16:08 2009
@@ -418,4 +418,39 @@
assertTrue(iter.hasNext()==false);
}
+
+ // See PIG-761
+ @Test
+ public void testLimitPOPackageAnnotator() throws Exception{
+ File tmpFile1 = File.createTempFile("test1", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("2\t5\t2");
+ ps1.close();
+
+ File tmpFile2 = File.createTempFile("test2", "txt");
+ PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+ ps2.println("1\t1");
+ ps2.println("2\t2");
+ ps2.close();
+
+ pigServer.registerQuery("A = LOAD '" +
Util.generateURI(tmpFile1.toString()) + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = LOAD '" +
Util.generateURI(tmpFile2.toString()) + "' AS (b0, b1);");
+ pigServer.registerQuery("C = LIMIT B 100;");
+ pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+
+ assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+
+ assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+
+ assertFalse(iter.hasNext());
+ }
+
}