Author: pradeepkth Date: Mon Mar 22 18:02:34 2010 New Revision: 926228 URL: http://svn.apache.org/viewvc?rev=926228&view=rev Log: PIG-1308: Inifinite loop in JobClient when reading from BinStorage Message: [org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 2] (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=926228&r1=926227&r2=926228&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Mar 22 18:02:34 2010 @@ -68,6 +68,10 @@ manner (rding via pradeepkth) IMPROVEMENTS +PIG-1308: Inifinite loop in JobClient when reading from BinStorage Message: +[org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to +process : 2] (pradeepkth) + PIG-1285: Allow SingleTupleBag to be serialized (dvryaboy) PIG-1117: Pig reading hive columnar rc tables (gerritjvv via dvryaboy) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Mon Mar 22 18:02:34 2010 @@ -185,7 +185,7 @@ public class LogicalOptimizer extends } @Override - public final void optimize() throws OptimizerException { + public final int optimize() throws OptimizerException { //the code that follows is a copy of the code in the //base class. see the todo note in the base class boolean sawMatch = false; @@ -240,5 +240,6 @@ public class LogicalOptimizer extends ((PruneColumns)pruneRule.getTransformer()).prune(); } } + return numIterations; } } \ No newline at end of file Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Mon Mar 22 18:02:34 2010 @@ -87,6 +87,34 @@ public class OpLimitOptimizer extends Lo + (lo == null ? lo : lo.getClass().getSimpleName()); throw new OptimizerException(msg, errCode, PigException.BUG); } + List<LogicalOperator> predecessors = mPlan.getPredecessors(lo); + if (predecessors.size()!=1) { + int errCode = 2008; + String msg = "Limit cannot have more than one input. Found " + predecessors.size() + " inputs."; + throw new OptimizerException(msg, errCode, PigException.BUG); + } + LogicalOperator predecessor = predecessors.get(0); + + // Limit cannot be pushed up + if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter || + predecessor instanceof LOLoad || predecessor instanceof LOSplit || + predecessor instanceof LODistinct || predecessor instanceof LOJoin) + { + return false; + } + // Limit cannot be pushed in front of ForEach if it has a flatten + if (predecessor instanceof LOForEach) + { + LOForEach loForEach = (LOForEach)predecessor; + List<Boolean> mFlatten = loForEach.getFlatten(); + boolean hasFlatten = false; + for (Boolean b:mFlatten) + if (b.equals(true)) hasFlatten = true; + + if (hasFlatten) { + return false; + } + } } catch (Exception e) { int errCode = 2049; String msg = "Error while performing checks to optimize limit operator."; Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java Mon Mar 22 18:02:34 2010 @@ -64,9 +64,11 @@ public abstract class PlanOptimizer<O ex * method of the associated Transformer to give the it a chance to * check whether it really wants to do the optimization. If that * returns true as well, then Transformer.transform is called. + * @return number of iterations the optimizer tried check and transform for + * the various rules * @throws OptimizerException */ - public void optimize() throws OptimizerException { + public int optimize() throws OptimizerException { //TODO //made the method non-final //we need a call back for transformer specific actions @@ -94,5 +96,6 @@ public abstract class PlanOptimizer<O ex } } } while(sawMatch && ++numIterations < mMaxIterations); + return numIterations; } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Mon Mar 22 18:02:34 2010 @@ -54,10 +54,10 @@ public class TestLogicalOptimizer extend return rep; } - public static void optimizePlan(LogicalPlan plan) throws Exception + public static int optimizePlan(LogicalPlan plan) throws Exception { LogicalOptimizer optimizer = new LogicalOptimizer(plan); - optimizer.optimize(); + return optimizer.optimize(); } public static void optimizePlan(LogicalPlan plan, ExecType mode) throws OptimizerException { @@ -193,6 +193,23 @@ public class TestLogicalOptimizer extend compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan10.dot"); } + /** + * Test that {...@link OpLimitOptimizer} returns false on the check if + * pre-conditions for pushing limit up are not met + * @throws Exception + */ + @Test + public void testOpLimitOptimizerCheck() throws Exception { + planTester.buildPlan("A = load 'myfile';"); + planTester.buildPlan("B = foreach A generate $0;"); + LogicalPlan plan = planTester.buildPlan("C = limit B 100;"); + LogicalOptimizerDerivative optimizer = new LogicalOptimizerDerivative(plan); + int numIterations = optimizer.optimize(); + assertFalse("Checking number of iterations of the optimizer [actual = " + + numIterations + ", expected < " + optimizer.getMaxIterations() + + "]", optimizer.getMaxIterations() == numIterations); + + } @Test //Test to ensure that the right exception is thrown @@ -237,5 +254,17 @@ public class TestLogicalOptimizer extend LogicalPlan plan = planTester.buildPlan("B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;"); optimizePlan(plan); } + + // a subclass of LogicalOptimizer which can return the maximum iterations + // the optimizer would try the check() and transform() methods + static class LogicalOptimizerDerivative extends LogicalOptimizer { + public LogicalOptimizerDerivative(LogicalPlan plan) { + super(plan); + } + + public int getMaxIterations() { + return mMaxIterations; + } + } }