Author: olga
Date: Fri Feb  6 22:34:59 2009
New Revision: 741763

URL: http://svn.apache.org/viewvc?rev=741763&view=rev
Log:
PIG-637
: limit after order by is broken in the local mode

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=741763&r1=741762&r2=741763&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb  6 22:34:59 2009
@@ -406,3 +406,6 @@
     storing strings > 65536 bytes (in UTF8 form) using BinStorage() (sms)
 
     PIG-642: Limit after FRJ causes problems (daijy)
+
+    PIG-637: Limit broken after order by in the local mode (shubhamc via
+    olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=741763&r1=741762&r2=741763&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb  6 22:34:59 2009
@@ -758,7 +758,8 @@
 
         // optimize
         if (optimize) {
-            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
+            //LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
+            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, 
pigContext.getExecType());
             optimizer.optimize();
         }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Fri Feb  6 22:34:59 2009
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.optimizer.*;
@@ -29,21 +30,29 @@
 /**
  * An optimizer for logical plans.
  */
-public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, 
LogicalPlan> {
-    
+public class LogicalOptimizer extends
+        PlanOptimizer<LogicalOperator, LogicalPlan> {
+
     public static final String LOLOAD_CLASSNAME = 
"org.apache.pig.impl.logicalLayer.LOLoad";
     public static final String LOSTREAM_CLASSNAME = 
"org.apache.pig.impl.logicalLayer.LOStream";
 
     public LogicalOptimizer(LogicalPlan plan) {
+        this(plan, ExecType.MAPREDUCE);
+    }
+
+    public LogicalOptimizer(LogicalPlan plan, ExecType mode) {
         super(plan);
+        runOptimizations(plan, mode);
+    }
 
+    private void runOptimizations(LogicalPlan plan, ExecType mode) {
         // List of rules for the logical optimizer
-        
+
         // This one has to be first, as the type cast inserter expects the
         // load to only have one output.
         // Find any places in the plan that have an implicit split and make
-        // it explicit.  Since the RuleMatcher doesn't handle trees properly,
-        // we cheat and say that we match any node.  Then we'll do the actual
+        // it explicit. Since the RuleMatcher doesn't handle trees properly,
+        // we cheat and say that we match any node. Then we'll do the actual
         // test in the transformers check method.
         List<String> nodes = new ArrayList<String>(1);
         Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
@@ -51,8 +60,8 @@
         nodes.add("any");
         required.add(true);
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-            required, new ImplicitSplitInserter(plan)));
-        
+                required, new ImplicitSplitInserter(plan)));
+
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
         nodes = new ArrayList<String>(1);
@@ -60,9 +69,9 @@
         edges = new HashMap<Integer, Integer>();
         required = new ArrayList<Boolean>(1);
         required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, 
required,
-            new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
-        
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+                required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+
         // Add type casting to plans where the schema has been declared by
         // user in a statement with stream operator.
         nodes = new ArrayList<String>(1);
@@ -70,28 +79,24 @@
         edges = new HashMap<Integer, Integer>();
         required = new ArrayList<Boolean>(1);
         required.add(true);
-        mRules.add(new Rule(nodes, edges, required,
-            new TypeCastInserter(plan, LOSTREAM_CLASSNAME)));
-        
+        mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
+                LOSTREAM_CLASSNAME)));
+
         // Optimize when LOAD precedes STREAM and the loader class
         // is the same as the serializer for the STREAM.
         // Similarly optimize when STREAM is followed by store and the
         // deserializer class is same as the Storage class.
-        mRules.add(new Rule(nodes, edges, required,
-            new StreamOptimizer(plan, LOSTREAM_CLASSNAME)));
-        
+        mRules.add(new Rule(nodes, edges, required, new StreamOptimizer(plan,
+                LOSTREAM_CLASSNAME)));
+
         // Push up limit where ever possible.
         nodes = new ArrayList<String>(1);
         edges = new HashMap<Integer, Integer>();
         required = new ArrayList<Boolean>(1);
         nodes.add("org.apache.pig.impl.logicalLayer.LOLimit");
         required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, 
required,
-            new OpLimitOptimizer(plan)));
-        
-        
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+                required, new OpLimitOptimizer(plan, mode)));
     }
 
 }
-
-

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 Fri Feb  6 22:34:59 2009
@@ -23,6 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LOCross;
@@ -53,11 +54,17 @@
 public class OpLimitOptimizer extends LogicalTransformer {
 
     private static final Log log = LogFactory.getLog(OpLimitOptimizer.class);
+    private ExecType mode = ExecType.MAPREDUCE;
 
     public OpLimitOptimizer(LogicalPlan plan) {
         super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
     }
 
+    public OpLimitOptimizer(LogicalPlan plan, ExecType mode) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+        this.mode = mode;
+    }
+
     @Override
     public boolean check(List<LogicalOperator> nodes) throws 
OptimizerException {
         if((nodes == null) || (nodes.size() <= 0)) {
@@ -193,18 +200,23 @@
             // Limit can be merged into LOSort, result a "limited sort"
             else if (predecessor instanceof LOSort)
             {
-               LOSort sort = (LOSort)predecessor;
-               if (sort.getLimit()==-1)
-                       sort.setLimit(limit.getLimit());
-               else
-                   
sort.setLimit(sort.getLimit()<limit.getLimit()?sort.getLimit():limit.getLimit());
-               try {
-                       removeFromChain(limit, null);
-               } catch (Exception e) {
-                   int errCode = 2012;
-                   String msg = "Can not remove LOLimit after LOSort";
-                       throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
-               }
+                if(mode == ExecType.LOCAL) {
+                    //We don't need this optimisation to happen in the local 
mode.
+                    //so we do nothing here.
+                } else {
+                    LOSort sort = (LOSort)predecessor;
+                    if (sort.getLimit()==-1)
+                        sort.setLimit(limit.getLimit());
+                    else
+                        
sort.setLimit(sort.getLimit()<limit.getLimit()?sort.getLimit():limit.getLimit());
+                    try {
+                        removeFromChain(limit, null);
+                    } catch (Exception e) {
+                        int errCode = 2012;
+                        String msg = "Can not remove LOLimit after LOSort";
+                        throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                    }
+                }
             }
             // Limit is merged into another LOLimit
             else if (predecessor instanceof LOLimit)

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Fri Feb 
 6 22:34:59 2009
@@ -17,8 +17,11 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.FileInputStream;
+import java.io.PrintWriter;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.optimizer.*;
 import org.apache.pig.test.utils.LogicalPlanTester;
@@ -58,6 +61,11 @@
         optimizer.optimize();
     }
     
+    public static void optimizePlan(LogicalPlan plan, ExecType mode) throws 
OptimizerException {
+        LogicalOptimizer optimizer = new LogicalOptimizer(plan, mode);
+        optimizer.optimize();
+    }
+    
     void compareWithGoldenFile(LogicalPlan plan, String filename) throws 
Exception
     {
         FileInputStream fis = new FileInputStream(filename);
@@ -153,6 +161,28 @@
         optimizePlan(plan);
         compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan7.dot");
     }
+    
+    @Test
+    //Limit in the local mode, need to make sure limit stays after a sort
+    public void testOPLimit8Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = order A by $0;");
+        LogicalPlan plan = planTester.buildPlan("C = limit B 10;");
+        optimizePlan(plan, ExecType.LOCAL);
+        compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan8.dot");
+        
+    }
+    
+    @Test
+    //Limit in the local mode, need to make sure limit stays after a sort
+    public void testOPLimit9Optimizer() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = order A by $0;");
+        LogicalPlan plan = planTester.buildPlan("C = limit B 10;");
+        optimizePlan(plan);
+        compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan9.dot");
+        
+    }
 
     @Test
     //Test to ensure that the right exception is thrown

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot?rev=741763&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot 
Fri Feb  6 22:34:59 2009
@@ -0,0 +1,6 @@
+digraph graph1 {
+    LOLoad55 -> LOSort57;
+    LOSort57 [limit="-1"];
+    LOSort57 -> LOLimit58;
+    LOLimit58 [limit="10"];
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot?rev=741763&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot 
Fri Feb  6 22:34:59 2009
@@ -0,0 +1,4 @@
+digraph graph1 {
+    LOLoad59 -> LOSort61;
+    LOSort61 [limit="10"];
+}


Reply via email to