Author: sms
Date: Sat Jan 24 06:33:52 2009
New Revision: 737307

URL: http://svn.apache.org/viewvc?rev=737307&view=rev
Log:
PIG-615: Wrong number of jobs with limit (shravanmn via sms)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=737307&r1=737306&r2=737307&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Jan 24 06:33:52 2009
@@ -373,4 +373,6 @@
 
     PIG-628: misc performance improvements (pradeepk via olgan)
     
-    PIG-589: error handling, phase 1-2 (sms via olgan):wq
+    PIG-589: error handling, phase 1-2 (sms via olgan)
+
+    PIG-615: Wrong number of jobs with limit (shravanmn via sms)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=737307&r1=737306&r2=737307&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Sat Jan 24 06:33:52 2009
@@ -1562,7 +1562,7 @@
             // Look for map reduce operators which contains limit operator.
             // If so and the requestedParallelism > 1, add one additional 
map-reduce
             // operator with 1 reducer into the original plan
-            if (mr.limit!=-1 && mr.requestedParallelism>1)
+            if (mr.limit!=-1 && mr.requestedParallelism!=1)
             {
                 opsToAdjust.add(mr);
             }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=737307&r1=737306&r2=737307&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Sat Jan 24 
06:33:52 2009
@@ -41,6 +41,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -828,7 +829,54 @@
                assertTrue(mrce.getErrorCode() == 2025);
        }
     }
+
+    /**
+     * Test to ensure that the order by without parallel followed by a limit, 
i.e., top k
+     * always produces the correct number of map reduce jobs
+     */
+    @Test
+    public void testNumReducersInLimit() throws Exception {
+       planTester.buildPlan("a = load 'input';");
+       planTester.buildPlan("b = order a by $0;");
+       planTester.buildPlan("c = limit b 10;");
+       LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+       
+       PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+       MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+       MapReduceOper mrOper = mrPlan.getRoots().get(0);
+       int count = 1;
+       
+       while(mrPlan.getSuccessors(mrOper) != null) {
+               mrOper = mrPlan.getSuccessors(mrOper).get(0);
+               ++count;
+       }        
+       assertTrue(count == 4);
+    }
     
+    /**
+     * Test to ensure that the order by with parallel followed by a limit, 
i.e., top k
+     * always produces the correct number of map reduce jobs
+     */
+    @Test
+    public void testNumReducersInLimitWithParallel() throws Exception {
+       planTester.buildPlan("a = load 'input';");
+       planTester.buildPlan("b = order a by $0 parallel 2;");
+       planTester.buildPlan("c = limit b 10;");
+       LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+       
+       PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+       MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+       MapReduceOper mrOper = mrPlan.getRoots().get(0);
+       int count = 1;
+       
+       while(mrPlan.getSuccessors(mrOper) != null) {
+               mrOper = mrPlan.getSuccessors(mrOper).get(0);
+               ++count;
+       }        
+       assertTrue(count == 4);
+    }
+
+
     public static class WeirdComparator extends ComparisonFunc {
 
         @Override


Reply via email to