Author: olga Date: Fri Feb 6 22:34:59 2009 New Revision: 741763 URL: http://svn.apache.org/viewvc?rev=741763&view=rev Log: PIG-637 : limit after order by is broken in the local mode
Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=741763&r1=741762&r2=741763&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Feb 6 22:34:59 2009 @@ -406,3 +406,6 @@ storing strings > 65536 bytes (in UTF8 form) using BinStorage() (sms) PIG-642: Limit after FRJ causes problems (daijy) + + PIG-637: Limit broken after order by in the local mode (shubhamc via + olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=741763&r1=741762&r2=741763&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb 6 22:34:59 2009 @@ -758,7 +758,8 @@ // optimize if (optimize) { - LogicalOptimizer optimizer = new LogicalOptimizer(lpClone); + //LogicalOptimizer optimizer = new LogicalOptimizer(lpClone); + LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType()); optimizer.optimize(); } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Fri Feb 6 22:34:59 2009 @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.pig.ExecType; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.optimizer.*; @@ -29,21 +30,29 @@ /** * An optimizer for logical plans. */ -public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, LogicalPlan> { - +public class LogicalOptimizer extends + PlanOptimizer<LogicalOperator, LogicalPlan> { + public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad"; public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream"; public LogicalOptimizer(LogicalPlan plan) { + this(plan, ExecType.MAPREDUCE); + } + + public LogicalOptimizer(LogicalPlan plan, ExecType mode) { super(plan); + runOptimizations(plan, mode); + } + private void runOptimizations(LogicalPlan plan, ExecType mode) { // List of rules for the logical optimizer - + // This one has to be first, as the type cast inserter expects the // load to only have one output. // Find any places in the plan that have an implicit split and make - // it explicit. Since the RuleMatcher doesn't handle trees properly, - // we cheat and say that we match any node. Then we'll do the actual + // it explicit. Since the RuleMatcher doesn't handle trees properly, + // we cheat and say that we match any node. Then we'll do the actual // test in the transformers check method. List<String> nodes = new ArrayList<String>(1); Map<Integer, Integer> edges = new HashMap<Integer, Integer>(); @@ -51,8 +60,8 @@ nodes.add("any"); required.add(true); mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, - required, new ImplicitSplitInserter(plan))); - + required, new ImplicitSplitInserter(plan))); + // Add type casting to plans where the schema has been declared (by // user, data, or data catalog). nodes = new ArrayList<String>(1); @@ -60,9 +69,9 @@ edges = new HashMap<Integer, Integer>(); required = new ArrayList<Boolean>(1); required.add(true); - mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required, - new TypeCastInserter(plan, LOLOAD_CLASSNAME))); - + mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, + required, new TypeCastInserter(plan, LOLOAD_CLASSNAME))); + // Add type casting to plans where the schema has been declared by // user in a statement with stream operator. nodes = new ArrayList<String>(1); @@ -70,28 +79,24 @@ edges = new HashMap<Integer, Integer>(); required = new ArrayList<Boolean>(1); required.add(true); - mRules.add(new Rule(nodes, edges, required, - new TypeCastInserter(plan, LOSTREAM_CLASSNAME))); - + mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan, + LOSTREAM_CLASSNAME))); + // Optimize when LOAD precedes STREAM and the loader class // is the same as the serializer for the STREAM. // Similarly optimize when STREAM is followed by store and the // deserializer class is same as the Storage class. - mRules.add(new Rule(nodes, edges, required, - new StreamOptimizer(plan, LOSTREAM_CLASSNAME))); - + mRules.add(new Rule(nodes, edges, required, new StreamOptimizer(plan, + LOSTREAM_CLASSNAME))); + // Push up limit where ever possible. nodes = new ArrayList<String>(1); edges = new HashMap<Integer, Integer>(); required = new ArrayList<Boolean>(1); nodes.add("org.apache.pig.impl.logicalLayer.LOLimit"); required.add(true); - mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required, - new OpLimitOptimizer(plan))); - - + mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, + required, new OpLimitOptimizer(plan, mode))); } } - - Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Fri Feb 6 22:34:59 2009 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.impl.logicalLayer.LOCogroup; import org.apache.pig.impl.logicalLayer.LOCross; @@ -53,11 +54,17 @@ public class OpLimitOptimizer extends LogicalTransformer { private static final Log log = LogFactory.getLog(OpLimitOptimizer.class); + private ExecType mode = ExecType.MAPREDUCE; public OpLimitOptimizer(LogicalPlan plan) { super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan)); } + public OpLimitOptimizer(LogicalPlan plan, ExecType mode) { + super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan)); + this.mode = mode; + } + @Override public boolean check(List<LogicalOperator> nodes) throws OptimizerException { if((nodes == null) || (nodes.size() <= 0)) { @@ -193,18 +200,23 @@ // Limit can be merged into LOSort, result a "limited sort" else if (predecessor instanceof LOSort) { - LOSort sort = (LOSort)predecessor; - if (sort.getLimit()==-1) - sort.setLimit(limit.getLimit()); - else - sort.setLimit(sort.getLimit()<limit.getLimit()?sort.getLimit():limit.getLimit()); - try { - removeFromChain(limit, null); - } catch (Exception e) { - int errCode = 2012; - String msg = "Can not remove LOLimit after LOSort"; - throw new OptimizerException(msg, errCode, PigException.BUG, e); - } + if(mode == ExecType.LOCAL) { + //We don't need this optimisation to happen in the local mode. + //so we do nothing here. + } else { + LOSort sort = (LOSort)predecessor; + if (sort.getLimit()==-1) + sort.setLimit(limit.getLimit()); + else + sort.setLimit(sort.getLimit()<limit.getLimit()?sort.getLimit():limit.getLimit()); + try { + removeFromChain(limit, null); + } catch (Exception e) { + int errCode = 2012; + String msg = "Can not remove LOLimit after LOSort"; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + } } // Limit is merged into another LOLimit else if (predecessor instanceof LOLimit) Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=741763&r1=741762&r2=741763&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Fri Feb 6 22:34:59 2009 @@ -17,8 +17,11 @@ */ package org.apache.pig.test; +import java.io.File; import java.io.FileInputStream; +import java.io.PrintWriter; +import org.apache.pig.ExecType; import org.apache.pig.impl.logicalLayer.*; import org.apache.pig.impl.logicalLayer.optimizer.*; import org.apache.pig.test.utils.LogicalPlanTester; @@ -58,6 +61,11 @@ optimizer.optimize(); } + public static void optimizePlan(LogicalPlan plan, ExecType mode) throws OptimizerException { + LogicalOptimizer optimizer = new LogicalOptimizer(plan, mode); + optimizer.optimize(); + } + void compareWithGoldenFile(LogicalPlan plan, String filename) throws Exception { FileInputStream fis = new FileInputStream(filename); @@ -153,6 +161,28 @@ optimizePlan(plan); compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan7.dot"); } + + @Test + //Limit in the local mode, need to make sure limit stays after a sort + public void testOPLimit8Optimizer() throws Exception { + planTester.buildPlan("A = load 'myfile';"); + planTester.buildPlan("B = order A by $0;"); + LogicalPlan plan = planTester.buildPlan("C = limit B 10;"); + optimizePlan(plan, ExecType.LOCAL); + compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan8.dot"); + + } + + @Test + //Limit in the local mode, need to make sure limit stays after a sort + public void testOPLimit9Optimizer() throws Exception { + planTester.buildPlan("A = load 'myfile';"); + planTester.buildPlan("B = order A by $0;"); + LogicalPlan plan = planTester.buildPlan("C = limit B 10;"); + optimizePlan(plan); + compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan9.dot"); + + } @Test //Test to ensure that the right exception is thrown Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot?rev=741763&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot Fri Feb 6 22:34:59 2009 @@ -0,0 +1,6 @@ +digraph graph1 { + LOLoad55 -> LOSort57; + LOSort57 [limit="-1"]; + LOSort57 -> LOLimit58; + LOLimit58 [limit="10"]; +} Added: hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot?rev=741763&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot Fri Feb 6 22:34:59 2009 @@ -0,0 +1,4 @@ +digraph graph1 { + LOLoad59 -> LOSort61; + LOSort61 [limit="10"]; +}