Author: pradeepkth
Date: Mon Mar 22 18:02:34 2010
New Revision: 926228

URL: http://svn.apache.org/viewvc?rev=926228&view=rev
Log:
PIG-1308: Inifinite loop in JobClient when reading from BinStorage Message: 
[org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to 
process : 2] (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=926228&r1=926227&r2=926228&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Mar 22 18:02:34 2010
@@ -68,6 +68,10 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1308: Inifinite loop in JobClient when reading from BinStorage Message:
+[org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to
+process : 2] (pradeepkth)
+
 PIG-1285: Allow SingleTupleBag to be serialized (dvryaboy)
 
 PIG-1117: Pig reading hive columnar rc tables (gerritjvv via dvryaboy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Mon Mar 22 18:02:34 2010
@@ -185,7 +185,7 @@ public class LogicalOptimizer extends
     }
 
     @Override
-    public final void optimize() throws OptimizerException {
+    public final int optimize() throws OptimizerException {
         //the code that follows is a copy of the code in the
         //base class. see the todo note in the base class
         boolean sawMatch = false;
@@ -240,5 +240,6 @@ public class LogicalOptimizer extends
                 ((PruneColumns)pruneRule.getTransformer()).prune();
             }
         }
+        return numIterations;
     }
 }
\ No newline at end of file

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 Mon Mar 22 18:02:34 2010
@@ -87,6 +87,34 @@ public class OpLimitOptimizer extends Lo
                         + (lo == null ? lo : lo.getClass().getSimpleName());
                 throw new OptimizerException(msg, errCode, PigException.BUG);
             }
+            List<LogicalOperator> predecessors = mPlan.getPredecessors(lo);
+            if (predecessors.size()!=1) {
+                int errCode = 2008;
+                String msg = "Limit cannot have more than one input. Found " + 
predecessors.size() + " inputs.";
+                throw new OptimizerException(msg, errCode, PigException.BUG);
+            }
+            LogicalOperator predecessor = predecessors.get(0);
+            
+            // Limit cannot be pushed up
+            if (predecessor instanceof LOCogroup || predecessor instanceof 
LOFilter ||
+                    predecessor instanceof LOLoad || predecessor instanceof 
LOSplit ||
+                    predecessor instanceof LODistinct || predecessor 
instanceof LOJoin)
+            {
+                return false;
+            }
+            // Limit cannot be pushed in front of ForEach if it has a flatten
+            if (predecessor instanceof LOForEach)
+            {
+                LOForEach loForEach = (LOForEach)predecessor;
+                List<Boolean> mFlatten = loForEach.getFlatten();
+                boolean hasFlatten = false;
+                for (Boolean b:mFlatten)
+                    if (b.equals(true)) hasFlatten = true;
+                
+                if (hasFlatten) {
+                    return false;
+                }
+            }
         } catch (Exception e) {
             int errCode = 2049;
             String msg = "Error while performing checks to optimize limit 
operator.";

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java 
Mon Mar 22 18:02:34 2010
@@ -64,9 +64,11 @@ public abstract class PlanOptimizer<O ex
      * method of the associated Transformer to give the it a chance to
      * check whether it really wants to do the optimization.  If that
      * returns true as well, then Transformer.transform is called. 
+     * @return number of iterations the optimizer tried check and transform for
+     * the various rules
      * @throws OptimizerException
      */
-    public void optimize() throws OptimizerException {
+    public int optimize() throws OptimizerException {
         //TODO
         //made the method non-final
         //we need a call back for transformer specific actions
@@ -94,5 +96,6 @@ public abstract class PlanOptimizer<O ex
                 }
             }
         } while(sawMatch && ++numIterations < mMaxIterations);
+        return numIterations;
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=926228&r1=926227&r2=926228&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalOptimizer.java Mon Mar 
22 18:02:34 2010
@@ -54,10 +54,10 @@ public class TestLogicalOptimizer extend
         return rep;
     }
     
-    public static void optimizePlan(LogicalPlan plan) throws Exception
+    public static int optimizePlan(LogicalPlan plan) throws Exception
     {
         LogicalOptimizer optimizer = new LogicalOptimizer(plan);
-        optimizer.optimize();
+        return optimizer.optimize();
     }
     
     public static void optimizePlan(LogicalPlan plan, ExecType mode) throws 
OptimizerException {
@@ -193,6 +193,23 @@ public class TestLogicalOptimizer extend
         compareWithGoldenFile(plan, FILE_BASE_LOCATION + "optlimitplan10.dot");
     }
 
+    /**
+     * Test that {...@link OpLimitOptimizer} returns false on the check if 
+     * pre-conditions for pushing limit up are not met
+     * @throws Exception
+     */
+    @Test
+    public void testOpLimitOptimizerCheck() throws Exception {
+        planTester.buildPlan("A = load 'myfile';");
+        planTester.buildPlan("B = foreach A generate $0;");
+        LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
+        LogicalOptimizerDerivative optimizer = new 
LogicalOptimizerDerivative(plan);
+        int numIterations = optimizer.optimize();
+        assertFalse("Checking number of iterations of the optimizer [actual = "
+                + numIterations + ", expected < " + 
optimizer.getMaxIterations() + 
+                "]", optimizer.getMaxIterations() == numIterations);
+    
+    }
     
     @Test
     //Test to ensure that the right exception is thrown
@@ -237,5 +254,17 @@ public class TestLogicalOptimizer extend
         LogicalPlan plan = planTester.buildPlan("B = foreach (limit (order 
(load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;");
         optimizePlan(plan);
     }
+
+    // a subclass of LogicalOptimizer which can return the maximum iterations
+    // the optimizer would try the check() and transform() methods 
+    static class LogicalOptimizerDerivative extends LogicalOptimizer {
+        public LogicalOptimizerDerivative(LogicalPlan plan) {
+            super(plan);
+        }
+        
+        public int getMaxIterations() {
+            return mMaxIterations;
+        }
+    }
 }
 


Reply via email to