Author: daijy
Date: Wed Sep 29 05:25:35 2010
New Revision: 1002474

URL: http://svn.apache.org/viewvc?rev=1002474&view=rev
Log:
PIG-1637: Combiner not use because optimizor inserts a foreach between group 
and algebric function

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestMergeForEachOptimization.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Wed Sep 29 05:25:35 2010
@@ -198,6 +198,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1637: Combiner not use because optimizor inserts a foreach between group
+and algebric function (daijy)
+
 PIG-1648: Split combination may return too many block locations to map/reduce 
framework (yanz)
 
 PIG-1641: Incorrect counters in local mode (rding)

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 Wed Sep 29 05:25:35 2010
@@ -163,10 +163,10 @@ public class DereferenceExpression exten
                 columnsCopy);
         
         // Only one input is expected.
-        LogicalExpression input = (LogicalExpression) plan.getPredecessors( 
this ).get( 0 );
+        LogicalExpression input = (LogicalExpression) plan.getSuccessors( this 
).get( 0 );
         LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
         lgExpPlan.add( inputCopy );
-        lgExpPlan.connect( inputCopy, copy );
+        lgExpPlan.connect( copy, inputCopy );
         
         return copy;
     }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
 Wed Sep 29 05:25:35 2010
@@ -113,6 +113,13 @@ public class MapLookupExpression extends
                 lgExpPlan,
                 this.getLookupKey(),
                 this.getFieldSchema().deepCopy());
+        
+        // Only one input is expected.
+        LogicalExpression input = (LogicalExpression) plan.getSuccessors( this 
).get( 0 );
+        LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
+        lgExpPlan.add( inputCopy );
+        lgExpPlan.connect( copy, inputCopy );
+        
         return copy;
     }
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 Wed Sep 29 05:25:35 2010
@@ -175,13 +175,13 @@ public class UserFuncExpression extends 
             
copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator());
             
             // Deep copy the input expressions.
-            List<Operator> inputs = plan.getPredecessors( this );
+            List<Operator> inputs = plan.getSuccessors( this );
             if( inputs != null ) {
                 for( Operator op : inputs ) {
                     LogicalExpression input = (LogicalExpression)op;
                     LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
                     lgExpPlan.add( inputCopy );
-                    lgExpPlan.connect( inputCopy, copy );
+                    lgExpPlan.connect( copy, inputCopy );
                 }
             }
             

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
 Wed Sep 29 05:25:35 2010
@@ -93,14 +93,17 @@ public class MergeForEach extends Rule {
             // Otherwise, we may do expression calculation more than once, 
defeat the benefit of this
             // optimization
             Set<Integer> inputs = new HashSet<Integer>();
+            boolean duplicateInputs = false;
             for (Operator op : foreach2.getInnerPlan().getSources()) {
                 // If the source is not LOInnerLoad, then it must be 
LOGenerate. This happens when 
                 // the 1st ForEach does not rely on any input of 2nd ForEach
                 if (op instanceof LOInnerLoad) {
                     LOInnerLoad innerLoad = (LOInnerLoad)op;
                     int input = innerLoad.getProjection().getColNum();
-                    if (inputs.contains(input))
-                        return false;
+                    if (inputs.contains(input)) {
+                        duplicateInputs = true;
+                        break;
+                    }
                     else
                         inputs.add(input);
                     
@@ -109,6 +112,27 @@ public class MergeForEach extends Rule {
                 }
             }
             
+            // Duplicate inputs in the case first foreach only containing 
LOInnerLoad and
+            // LOGenerate is allowed, and output plan is simple projection
+            if (duplicateInputs) {
+                Iterator<Operator> it1 = 
foreach1.getInnerPlan().getOperators();
+                while( it1.hasNext() ) {
+                    Operator op = it1.next();
+                    if(!(op instanceof LOGenerate) && !(op instanceof 
LOInnerLoad))
+                        return false;
+                    if (op instanceof LOGenerate) {
+                        List<LogicalExpressionPlan> outputPlans = 
((LOGenerate)op).getOutputPlans();
+                        for (LogicalExpressionPlan outputPlan : outputPlans) {
+                            Iterator<Operator> iter = 
outputPlan.getOperators();
+                            while (iter.hasNext()) {
+                                if (!(iter.next() instanceof 
ProjectExpression))
+                                    return false;
+                            }
+                        }
+                    }
+                }
+            }
+            
             return true;
         }
 
@@ -117,25 +141,40 @@ public class MergeForEach extends Rule {
             return subPlan;
         }
 
-        private void addBranchToPlan(LOGenerate gen, int branch, OperatorPlan 
newPlan) {
+        // If op is LOInnerLoad, get a copy of it, otherwise, return op itself
+        private Operator getOperatorToMerge(Operator op, OperatorPlan newPlan, 
LOForEach newForEach) {
+            Operator opToMerge = op;
+            if (op instanceof LOInnerLoad) {
+                opToMerge = new LOInnerLoad(newPlan, newForEach, 
((LOInnerLoad)op).getColNum());
+            } else {
+                opToMerge.setPlan(newPlan);
+            }
+            return opToMerge;
+        }
+        
+        private Operator addBranchToPlan(LOGenerate gen, int branch, 
OperatorPlan newPlan, LOForEach newForEach) {
+            Operator opNextToGen;
             Operator op = gen.getPlan().getPredecessors(gen).get(branch);
-            newPlan.add(op);
-            op.setPlan(newPlan);
+            Operator opToMerge = getOperatorToMerge(op, newPlan, newForEach);
+            newPlan.add(opToMerge);
+            opNextToGen = opToMerge;
+            
             Operator pred;
             if (gen.getPlan().getPredecessors(op)!=null)
                 pred = gen.getPlan().getPredecessors(op).get(0);
             else
                 pred = null;
             while (pred!=null) {
-                newPlan.add(pred);
-                pred.setPlan(newPlan);
-                newPlan.connect(pred, op);
+                Operator predToMerge = getOperatorToMerge(pred, newPlan, 
newForEach);
+                newPlan.add(predToMerge);
+                newPlan.connect(predToMerge, op);
                 op = pred;
                 if (gen.getPlan().getPredecessors(pred)!=null)
                     pred = gen.getPlan().getPredecessors(pred).get(0);
                 else
                     pred = null;
             }
+            return opNextToGen;
         }
         
         @Override
@@ -159,7 +198,8 @@ public class MergeForEach extends Rule {
             
             for (LogicalExpressionPlan exp2 : gen2.getOutputPlans()) {
                 LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
-                newExpPlan.merge(exp2);
+                LogicalExpressionPlan exp2Copy = exp2.deepCopy();
+                newExpPlan.merge(exp2Copy);
                 
                 // Add expression plan in 2nd ForEach
                 List<Operator> exp2Sinks = new ArrayList<Operator>();
@@ -171,7 +211,8 @@ public class MergeForEach extends Rule {
                         LOInnerLoad innerLoad = 
(LOInnerLoad)foreach2.getInnerPlan().getPredecessors(gen2).get(proj.getInputNum());
                         int exp1Pos = innerLoad.getProjection().getColNum();
                         LogicalExpressionPlan exp1 = 
gen1.getOutputPlans().get(exp1Pos);
-                        List<Operator> exp1Sources = newExpPlan.merge(exp1);
+                        LogicalExpressionPlan exp1Copy = exp1.deepCopy();
+                        List<Operator> exp1Sources = 
newExpPlan.merge(exp1Copy);
                         
                         // Copy expression plan to the new ForEach, connect to 
the expression plan of 2nd ForEach
                         Operator exp1Source = exp1Sources.get(0);
@@ -191,8 +232,7 @@ public class MergeForEach extends Rule {
                 List<Operator> exp1Sinks = newExpPlan.getSinks();
                 for (Operator exp1Sink : exp1Sinks) {
                     if (exp1Sink instanceof ProjectExpression) {
-                        addBranchToPlan(gen1, 
((ProjectExpression)exp1Sink).getInputNum(), newForEachInnerPlan);
-                        Operator opNextToGen = 
foreach1.getInnerPlan().getPredecessors(gen1).get(((ProjectExpression)exp1Sink).getInputNum());
+                        Operator opNextToGen = addBranchToPlan(gen1, 
((ProjectExpression)exp1Sink).getInputNum(), newForEachInnerPlan, newForEach);
                         newForEachInnerPlan.connect(opNextToGen, newGen);
                         int input = 
newForEachInnerPlan.getPredecessors(newGen).indexOf(opNextToGen);
                         ((ProjectExpression)exp1Sink).setInputNum(input);

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1002474&r1=1002473&r2=1002474&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestMergeForEachOptimization.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestMergeForEachOptimization.java
 Wed Sep 29 05:25:35 2010
@@ -128,6 +128,39 @@ public class TestMergeForEachOptimizatio
     }
     
     /**
+     * One output of first foreach was referred more than once in the second 
foreach
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void testDuplicateInputs() throws IOException {
+        LogicalPlanTester lpt = new LogicalPlanTester( pc );
+        lpt.buildPlan( "A = load 'file.txt' as (a0:int, a1:double);" );
+        lpt.buildPlan( "A1 = foreach A generate (int)a0 as a0, (double)a1 as 
a1;" );
+        lpt.buildPlan( "B = group A1 all;" );
+        lpt.buildPlan( "C = foreach B generate A1;" );
+        lpt.buildPlan( "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" );
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( 
"store D into 'empty';" );  
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        
+        Operator store = newLogicalPlan.getSinks().get(0);
+        int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+        LOForEach foreach1 = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+        Assert.assertTrue( foreach1.getAlias().equals( "D" ) );
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        
+        int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+        // The number of FOREACHes didn't change because one is genereated 
because of type cast and
+        // one is reduced because of the merge.
+        Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
+        
+        LOForEach foreach2 = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+        Assert.assertTrue( foreach2.getAlias().equals( "D" ) );
+    }
+    
+    /**
      * Not all consecutive FOREACHes can be merged. In this case, the second 
FOREACH statment
      * has inner plan, which cannot be merged with one before it.
      * 


Reply via email to