Author: daijy Date: Fri Dec 25 00:16:08 2009 New Revision: 893825 URL: http://svn.apache.org/viewvc?rev=893825&view=rev Log: PIG-761: ERROR 2086 on simple JOIN
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893825&r1=893824&r2=893825&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Dec 25 00:16:08 2009 @@ -319,6 +319,8 @@ PIG-1165: Signature of loader does not set correctly for order by (daijy) +PIG-761: ERROR 2086 on simple JOIN (daijy) + Release 0.5.0 INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=893825&r1=893824&r2=893825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Dec 25 00:16:08 2009 @@ -2418,6 +2418,7 @@ throw new MRCompilerException(msg, errCode, PigException.BUG); } FileSpec oldSpec = ((POStore)mpLeaf).getSFile(); + boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore(); FileSpec fSpec = getTempFileSpec(); ((POStore)mpLeaf).setSFile(fSpec); @@ -2439,9 +2440,10 @@ limitAdjustMROp.reducePlan.addAsLeaf(pLimit2); POStore st = getStore(); st.setSFile(oldSpec); - st.setIsTmpStore(false); + st.setIsTmpStore(oldIsTmpStore); limitAdjustMROp.reducePlan.addAsLeaf(st); limitAdjustMROp.requestedParallelism = 1; + limitAdjustMROp.setLimitOnly(true); // If the operator we're following has global sort set, we // need to indicate that this is a limit after a sort. // This will assure that we get the right sort comparator Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=893825&r1=893824&r2=893825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Dec 25 00:16:08 2009 @@ -83,6 +83,10 @@ // Indicates if this is a limit after a sort boolean limitAfterSort = false; + + // Indicate if the entire purpose for this map reduce job is doing limit, does not change + // anything else. This is to help POPackageAnnotator to find the right POPackage to annotate + boolean limitOnly = false; // If true, putting an identity combine in this // mapreduce job will speed things up. @@ -284,6 +288,14 @@ public void setLimitAfterSort(boolean las) { limitAfterSort = las; } + + public boolean isLimitOnly() { + return limitOnly; + } + + public void setLimitOnly(boolean limitOnly) { + this.limitOnly = limitOnly; + } public boolean needsDistinctCombiner() { return needsDistinctCombiner; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=893825&r1=893824&r2=893825&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri Dec 25 00:16:08 2009 @@ -105,6 +105,8 @@ List<MapReduceOper> preds = this.mPlan.getPredecessors(mr); for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) { MapReduceOper mrOper = it.next(); + if (mrOper.isLimitOnly()) + mrOper = this.mPlan.getPredecessors(mrOper).get(0); lrFound += patchPackage(mrOper.reducePlan, pkg); if(lrFound == pkg.getNumInps()) { break; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=893825&r1=893824&r2=893825&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Dec 25 00:16:08 2009 @@ -418,4 +418,39 @@ assertTrue(iter.hasNext()==false); } + + // See PIG-761 + @Test + public void testLimitPOPackageAnnotator() throws Exception{ + File tmpFile1 = File.createTempFile("test1", "txt"); + PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1)); + ps1.println("1\t2\t3"); + ps1.println("2\t5\t2"); + ps1.close(); + + File tmpFile2 = File.createTempFile("test2", "txt"); + PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2)); + ps2.println("1\t1"); + ps2.println("2\t2"); + ps2.close(); + + pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString()) + "' AS (b0, b1);"); + pigServer.registerQuery("C = LIMIT B 100;"); + pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + + assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})")); + + assertTrue(iter.hasNext()); + t = iter.next(); + + assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})")); + + assertFalse(iter.hasNext()); + } + }