Author: daijy
Date: Fri Dec 25 00:16:08 2009
New Revision: 893825

URL: http://svn.apache.org/viewvc?rev=893825&view=rev
Log:
PIG-761: ERROR 2086 on simple JOIN

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Dec 25 00:16:08 2009
@@ -319,6 +319,8 @@
 
 PIG-1165: Signature of loader does not set correctly for order by (daijy)
 
+PIG-761: ERROR 2086 on simple JOIN (daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Dec 25 00:16:08 2009
@@ -2418,6 +2418,7 @@
                     throw new MRCompilerException(msg, errCode, 
PigException.BUG);
                 }
                 FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+                boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
                 
                 FileSpec fSpec = getTempFileSpec();
                 ((POStore)mpLeaf).setSFile(fSpec);
@@ -2439,9 +2440,10 @@
                 limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
                 POStore st = getStore();
                 st.setSFile(oldSpec);
-                st.setIsTmpStore(false);
+                st.setIsTmpStore(oldIsTmpStore);
                 limitAdjustMROp.reducePlan.addAsLeaf(st);
                 limitAdjustMROp.requestedParallelism = 1;
+                limitAdjustMROp.setLimitOnly(true);
                 // If the operator we're following has global sort set, we
                 // need to indicate that this is a limit after a sort.
                 // This will assure that we get the right sort comparator

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Fri Dec 25 00:16:08 2009
@@ -83,6 +83,10 @@
 
     // Indicates if this is a limit after a sort
     boolean limitAfterSort = false;
+    
+    // Indicate if the entire purpose for this map reduce job is doing limit, 
does not change
+    // anything else. This is to help POPackageAnnotator to find the right 
POPackage to annotate
+    boolean limitOnly = false;
 
     // If true, putting an identity combine in this
     // mapreduce job will speed things up.
@@ -284,6 +288,14 @@
     public void setLimitAfterSort(boolean las) {
         limitAfterSort = las;
     }
+    
+    public boolean isLimitOnly() {
+        return limitOnly;
+    }
+    
+    public void setLimitOnly(boolean limitOnly) {
+        this.limitOnly = limitOnly;
+    }
 
     public boolean needsDistinctCombiner() { 
         return needsDistinctCombiner;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
 Fri Dec 25 00:16:08 2009
@@ -105,6 +105,8 @@
             List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
             for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) 
{
                 MapReduceOper mrOper = it.next();
+                if (mrOper.isLimitOnly())
+                    mrOper = this.mPlan.getPredecessors(mrOper).get(0);
                 lrFound += patchPackage(mrOper.reducePlan, pkg);
                 if(lrFound == pkg.getNumInps()) {
                     break;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=893825&r1=893824&r2=893825&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Dec 25 
00:16:08 2009
@@ -418,4 +418,39 @@
         
         assertTrue(iter.hasNext()==false);
     }
+    
+    // See PIG-761
+    @Test
+    public void testLimitPOPackageAnnotator() throws Exception{
+        File tmpFile1 = File.createTempFile("test1", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("2\t5\t2");
+        ps1.close();
+        
+        File tmpFile2 = File.createTempFile("test2", "txt");
+        PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+        ps2.println("1\t1");
+        ps2.println("2\t2");
+        ps2.close();
+
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile1.toString()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = LOAD '" + 
Util.generateURI(tmpFile2.toString()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = LIMIT B 100;");
+        pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+        
+        assertFalse(iter.hasNext());
+    }
+
 }


Reply via email to