Author: daijy
Date: Mon Sep  6 05:38:53 2010
New Revision: 992943

URL: http://svn.apache.org/viewvc?rev=992943&view=rev
Log:
PIG-1575: Complete the migration of optimization rule PushUpFilter including 
missing test cases

Modified:
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 Mon Sep  6 05:38:53 2010
@@ -161,6 +161,13 @@ public class DereferenceExpression exten
         LogicalExpression copy = new DereferenceExpression(
                 lgExpPlan,
                 columnsCopy);
+        
+        // Only one input is expected.
+        LogicalExpression input = (LogicalExpression) plan.getPredecessors( 
this ).get( 0 );
+        LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
+        lgExpPlan.add( inputCopy );
+        lgExpPlan.connect( inputCopy, copy );
+        
         return copy;
     }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
 Mon Sep  6 05:38:53 2010
@@ -94,4 +94,12 @@ public class LogicalExpressionPlan exten
         return sources;
     }
 
+    public LogicalExpressionPlan deepCopy() throws FrontendException {
+        LogicalExpressionPlan result = new LogicalExpressionPlan();
+        LogicalExpression root = (LogicalExpression)getSources().get( 0 );
+        LogicalExpression newRoot = root.deepCopy( result );
+        result.add( newRoot );
+        return result;
+    }
+
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 Mon Sep  6 05:38:53 2010
@@ -170,6 +170,18 @@ public class UserFuncExpression extends 
                     lgExpPlan,
                     this.getFuncSpec().clone() );
             
copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator());
+            
+            // Deep copy the input expressions.
+            List<Operator> inputs = plan.getPredecessors( this );
+            if( inputs != null ) {
+                for( Operator op : inputs ) {
+                    LogicalExpression input = (LogicalExpression)op;
+                    LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
+                    lgExpPlan.add( inputCopy );
+                    lgExpPlan.connect( inputCopy, copy );
+                }
+            }
+            
         } catch(CloneNotSupportedException e) {
              e.printStackTrace();
         }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java 
Mon Sep  6 05:38:53 2010
@@ -15,23 +15,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.pig.newplan.logical.rules;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
@@ -50,116 +68,226 @@ public class PushUpFilter extends Rule {
     }
 
     public class PushUpFilterTransformer extends Transformer {
-
         private OperatorSubPlan subPlan;
 
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {  
 
             // check if it is inner join
-            LOJoin join = (LOJoin)matched.getSources().get(0);
-            boolean[] innerFlags = join.getInnerFlags();
-            for(boolean inner: innerFlags) {
-                if (!inner){
-                    return false;
-                }
+            Operator current = matched.getSources().get(0);
+            
+            Operator pred = findNonFilterPredecessor( current );
+            if( pred == null )
+                return false;
+            
+            // sort, distinct, or sort by is always okay.
+            if( pred instanceof LOSort || pred instanceof LODistinct || pred 
instanceof LOUnion ) {
+                return true;
             }
-           
-            Operator next = matched.getSinks().get(0);
-            while(next != null && next instanceof LOFilter) {
-                LOFilter filter = (LOFilter)next;            
-                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+            
+            if( pred instanceof LOCogroup ) {
+                LOCogroup cogrp = (LOCogroup)pred;
+                if( currentPlan.getPredecessors( cogrp ).size() == 1 ) {
+                    // Order by is always ok.
+                    return true;
+                }
                 
-                // collect all uids used in the filter plan
-                Set<Long> uids = new HashSet<Long>();
-                Iterator<Operator> iter = filterPlan.getOperators();
-                while(iter.hasNext()) {
-                    Operator op = iter.next();
-                    if (op instanceof ProjectExpression) {
-                        long uid = 
((ProjectExpression)op).getFieldSchema().uid;
-                        uids.add(uid);
+                if( 1 == cogrp.getExpressionPlans().get( 0 ).size() ) {
+                    // Optimization is okay if there is only a single key.
+                    return true;
+                }
+            }
+            
+            // if the predecessor is one of 
LOLoad/LOStore/LOStream/LOLimit/LONative
+            // if predecessor is LOForEach, it is optimized by rule 
FilterAboveForeach
+            // return false
+            if( pred instanceof LOLoad   || pred instanceof LOStore || pred 
instanceof LOStream      ||
+                pred instanceof LOFilter || pred instanceof LOSplit || pred 
instanceof LOSplitOutput || 
+                pred instanceof LOLimit  || pred instanceof LONative || pred 
instanceof LOForEach) {
+                return false;
+            }
+            
+            
+            // if the predecessor is a multi-input operator then detailed
+            // checks are required
+            if( pred instanceof LOCross || pred instanceof LOJoin ) {
+                boolean[] innerFlags = null;
+                boolean isFullOuter = true;
+                boolean isInner = true;
+                if( pred instanceof LOJoin ) {
+                    innerFlags = ((LOJoin)pred).getInnerFlags();
+                    // If all innerFlag is false, means a full outer join,
+                    for (boolean inner : innerFlags) {
+                        if (inner) {
+                            isFullOuter = false;
+                        } else {
+                            isInner = false;
+                        }
                     }
+                    if (isFullOuter)
+                        return false;
                 }
-                                
-                List<Operator> preds = currentPlan.getPredecessors(join);
-                            
+                
+                LOFilter filter = (LOFilter)current;            
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                    
+                // collect all uids used in the filter plan
+                Set<Long> uids = collectUidFromExpPlan(filterPlan);
+                                    
+                List<Operator> preds = currentPlan.getPredecessors(pred);
+
                 for(int j=0; j<preds.size(); j++) {
-                    if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) 
{                            
-                        return true;
+                    if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) 
{
+                        // For LOJoin, innerFlag==true indicate that branch is 
the outer join side
+                        // which has the exact opposite semantics
+                        if (pred instanceof LOCross || pred instanceof LOJoin 
&& (isInner || innerFlags[j]))
+                            return true;
                     }
                 }                       
-             
-                // if current filter can not move up, check next filter
-                List<Operator> l = currentPlan.getSuccessors(filter);
-                if (l != null) {
-                    next = l.get(0);
-                } else {
-                    next = null;
-                }
             }
             
             return false;
         }
+        
+        Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) 
throws FrontendException {
+            Set<Long> uids = new HashSet<Long>();
+            Iterator<Operator> iter = filterPlan.getOperators();
+            while(iter.hasNext()) {
+                Operator op = iter.next();
+                if (op instanceof ProjectExpression) {
+                    long uid = ((ProjectExpression)op).getFieldSchema().uid;
+                    uids.add(uid);
+                }
+            }
+            return uids;
+        }
+        
+        /**
+         * Starting from current operator (which is a filter), search its 
successors until 
+         * locating a non-filter operator. Null is returned if none is found.
+         */
+        private Operator findNonFilterPredecessor(Operator current) {
+            Operator op = current;
+            do {
+                List<Operator> predecessors = currentPlan.getPredecessors( op 
);
+
+                // if there are no predecessors return false
+                if( predecessors == null || predecessors.size() == 0 ) {
+                    return null;
+                }
+
+                Operator pred = predecessors.get( 0 );
+                if( pred instanceof LOFilter ) {
+                    op = pred;
+                    continue;
+                } else {
+                    return pred;
+                }
+            } while( true );
+                
+        }
 
         @Override
         public void transform(OperatorPlan matched) throws FrontendException {
             subPlan = new OperatorSubPlan(currentPlan);
 
-            LOJoin join = (LOJoin)matched.getSources().get(0);
-            subPlan.add(join);     
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            
+            // This is the one that we will insert filter btwn it and it's 
input.
+            Operator predecessor = this.findNonFilterPredecessor( filter );
+            subPlan.add( predecessor) ;
             
-            Operator next = matched.getSinks().get(0);
-            while(next != null && next instanceof LOFilter) {
-                LOFilter filter = (LOFilter)next;                
-                subPlan.add(filter);
+            // Disconnect the filter in the plan without removing it from the 
plan.
+            Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
+            Operator succed;
+            
+            if (currentPlan.getSuccessors(filter)!=null)
+                succed = currentPlan.getSuccessors(filter).get(0);
+            else
+                succed = null;
+            
+            Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
+            
+            if (succed!=null) {
+                subPlan.add(succed);
+                Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, 
succed);
+                currentPlan.connect(predec, p1.first, succed, p2.second);
+            }
+            
+            if( predecessor instanceof LOSort || predecessor instanceof 
LODistinct ||
+                ( predecessor instanceof LOCogroup && 
currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
+                // For sort, put the filter in front of it.
+                Operator prev = currentPlan.getPredecessors( predecessor 
).get( 0 );
                 
-                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                insertFilter( prev, predecessor, filter );
+                return;
+            }
+
+            // Find the predecessor of join that contains all required uids.
+            LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+            List<Operator> preds = currentPlan.getPredecessors( predecessor );
+            Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, 
predecessor, preds );
+            
+            LOFilter newFilter = null;                
+            for( Entry<Integer, Operator> entry : inputs.entrySet() ) {
+                int inputIndex = entry.getKey();
+                Operator pred = entry.getValue();
                 
-                // collect all uids used in the filter plan
-                Set<Long> uids = new HashSet<Long>();
-                Iterator<Operator> iter = filterPlan.getOperators();
-                while(iter.hasNext()) {
-                    Operator op = iter.next();
-                    if (op instanceof ProjectExpression) {
-                        long uid = 
((ProjectExpression)op).getFieldSchema().uid;
-                        uids.add(uid);
+                // Find projection field offset
+                int columnOffset = 0;
+                if( predecessor instanceof LOJoin || predecessor instanceof 
LOCross ) {
+                    for( int i = 0; i < inputIndex; i++ ) {
+                        columnOffset += ( 
(LogicalRelationalOperator)preds.get( i ) ).getSchema().size();
                     }
                 }
                 
-                // Find the predecessor of join that contains all required 
uids.
-                Operator input = null;
-                List<Operator> preds = currentPlan.getPredecessors(join);
-                for(int j=0; j<preds.size(); j++) {
-                    if( hasAll((LogicalRelationalOperator)preds.get(j), uids) 
) {
-                        input = preds.get(j);   
-                        subPlan.add(input);
-                        break;
-                    }
+                // Reuse the filter for the first match. For others, need to 
make a copy of the filter
+                // and add it between input and predecessor.
+                newFilter = newFilter == null ? filter : new LOFilter( 
(LogicalPlan)currentPlan );
+                
+                currentPlan.add( newFilter );
+                subPlan.add( newFilter );
+                subPlan.add( pred );
+                LogicalExpressionPlan fPlan = filterPlan.deepCopy();
+                List<Operator> sinks = fPlan.getSinks();
+                List<ProjectExpression> projExprs = new 
ArrayList<ProjectExpression>();
+                for( Operator sink : sinks ) {
+                    if( sink instanceof ProjectExpression )
+                        projExprs.add( (ProjectExpression)sink );
                 }
-                            
-                if( input != null ) {
-                    // Found one of the join's predeccessors of the join which 
has all the uids.
-                    Operator pred = currentPlan.getPredecessors(filter).get(0);
-                    Operator succed = currentPlan.getSuccessors(filter).get(0);
-                    subPlan.add(succed);
-                    
-                    Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, 
filter);
-                    Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, 
succed);
-                    currentPlan.connect(pred, p1.first, succed, p2.second);
-                    
-                    succed = currentPlan.getSuccessors(input).get(0);
-                    Pair<Integer, Integer> p3 = currentPlan.disconnect(input, 
succed);
-                    currentPlan.connect(input, p3.first, filter, 0);
-                    currentPlan.connect(filter, 0, succed, p3.second);
-                    return;
-                }  else {
-                    // Didn't find the opeartor, so looking at the next one 
after the filter.
-                    List<Operator> l = currentPlan.getSuccessors(filter );
-                    if( l != null ) {
-                        next = l.get( 0 );
-                    } else {
-                        next = null;
+                
+                if( predecessor instanceof LOCogroup ) {
+                    for( ProjectExpression projExpr : projExprs ) {
+                        // Need to merge filter condition and cogroup by 
expression;
+                        LogicalExpressionPlan plan = ((LOCogroup) 
predecessor).getExpressionPlans().get( inputIndex ).iterator().next();
+                        LogicalExpressionPlan copy = plan.deepCopy();
+                        LogicalExpression root = 
(LogicalExpression)copy.getSinks().get( 0 );
+                        List<Operator> predecessors = fPlan.getPredecessors( 
projExpr );
+                        if( predecessors == null || predecessors.size() == 0 ) 
{
+                            fPlan.remove( projExpr );
+                            fPlan.add( root );
+                        } else {
+                            fPlan.add( root );
+                            Operator pred1 = predecessors.get( 0 );
+                            Pair<Integer, Integer> pair = fPlan.disconnect( 
pred1, projExpr );
+                            fPlan.connect( pred1, pair.first, root, 
pair.second );
+                            fPlan.remove( projExpr );
+                        }
                     }
                 }
+                
+                // Now, reset the projection expressions in the new filter 
plan.
+                sinks = fPlan.getSinks();
+                for( Operator sink : sinks ) {
+                    if( sink instanceof ProjectExpression ) {
+                        ProjectExpression projE = (ProjectExpression)sink;
+                         projE.setAttachedRelationalOp( newFilter );
+                         projE.setInputNum( 0 );
+                         projE.setColNum( projE.getColNum() - columnOffset );
+                    }
+                 }
+                newFilter.setFilterPlan( fPlan );
+                
+                insertFilter( pred, predecessor, newFilter );
             }
         }
         
@@ -177,23 +305,65 @@ public class PushUpFilter extends Rule {
            
         @Override
         public OperatorPlan reportChanges() {            
-            return subPlan;
+            return currentPlan;
         }          
 
+        // Insert the filter in between the given two operators.
+        private void insertFilter(Operator prev, Operator predecessor, 
LOFilter filter)
+        throws FrontendException {
+            Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, 
predecessor );
+            currentPlan.connect( prev, p3.first, filter, 0 );
+            currentPlan.connect( filter, 0, predecessor, p3.second );
+        }
+        
+        // Identify those among preds that will need to have a filter between 
it and the predecessor.
+        private Map<Integer, Operator> 
findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor, 
+                List<Operator> preds) throws FrontendException {
+            Map<Integer, Operator> inputs = new HashMap<Integer, Operator>();
+            
+            if( predecessor instanceof LOUnion || predecessor instanceof 
LOCogroup ) {
+                for( int i = 0; i < preds.size(); i++ ) {
+                    inputs.put( i, preds.get( i ) );
+                }
+                return inputs;
+            }
+            
+            // collect all uids used in the filter plan
+            Set<Long> uids = collectUidFromExpPlan(filterPlan);
+            boolean[] innerFlags = null;
+            boolean isInner = true;
+            if (predecessor instanceof LOJoin) {
+                innerFlags = ((LOJoin)predecessor).getInnerFlags();
+                for (boolean inner : innerFlags) {
+                    if (!inner) {
+                        isInner = false;
+                        break;
+                    }
+                }
+            }
+            
+            // Find the predecessor of join that contains all required uids.
+            for(int j=0; j<preds.size(); j++) {
+                // Filter can push to LOJoin outer branch, but no inner branch
+                if( hasAll((LogicalRelationalOperator)preds.get(j), uids) && 
+                        (predecessor instanceof LOCross || predecessor 
instanceof LOJoin && (isInner || innerFlags[j]))) {
+                    Operator input = preds.get(j);   
+                    subPlan.add(input);
+                    inputs.put( j, input );
+                }
+            }
+            return  inputs;
+        }
     }
 
     @Override
     protected OperatorPlan buildPattern() {        
-        // the pattern that this rule looks for
-        // is join -> filter
         LogicalPlan plan = new LogicalPlan();
-        LogicalRelationalOperator op1 = new LOJoin(plan);
-        LogicalRelationalOperator op2 = new LOFilter(plan);
-        plan.add(op1);
-        plan.add(op2);
-        plan.connect(op1, op2);
+        LogicalRelationalOperator op1 = new LOFilter(plan);
+        plan.add( op1 );
         
         return plan;
     }
+
 }
 

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- 
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java 
(original)
+++ 
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java 
Mon Sep  6 05:38:53 2010
@@ -35,6 +35,7 @@ import org.apache.pig.newplan.logical.op
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
@@ -263,7 +264,141 @@ public class TestNewPlanFilterAboveForea
         Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
         Assert.assertTrue( filter instanceof LOFilter );
     }
-    
+
+    @Test
+    public void testFilterForeach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, $2;");        
+        planTester.buildPlan("C = filter B by $0 < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    @Test
+    public void testFilterForeachAddedField() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, $2, COUNT({(1)});");  
      
+        planTester.buildPlan("C = filter B by $2 < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    @Test
+    public void testFilterForeachCast() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate (int)$1, $2;");        
+        planTester.buildPlan("C = filter B by $0 < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    @Test
+    public void testFilterCastForeach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, $2;");        
+        planTester.buildPlan("C = filter B by (int)$0 < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+
+    @Test
+    public void testFilterConstantConditionForeach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, $2;");        
+        planTester.buildPlan("C = filter B by 1 == 1;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    @Test
+    public void testFilterUDFForeach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, $2;");        
+        planTester.buildPlan("C = filter B by " + Identity.class.getName() + 
"($1) ;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    @Test
+    public void testFilterForeachFlatten() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = foreach A generate $1, flatten($2);");       
 
+        planTester.buildPlan("C = filter B by $0 < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( fe instanceof LOForEach );
+        Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
     private LogicalPlan 
migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) 
throws FrontendException {
         LogicalPlan newLogicalPlan = migratePlan( plan );
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=992943&r1=992942&r2=992943&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Mon 
Sep  6 05:38:53 2010
@@ -325,9 +325,7 @@ public class TestNewPlanFilterRule exten
         assertEquals(next.getClass(), LOStore.class);
         
         transformed = listener.getTransformed();
-        assertEquals(transformed.size(), 4);
-        assertEquals(transformed.getSinks().get(0).getClass(), LOFilter.class);
-        assertEquals(transformed.getSources().get(0).getClass(), LOLoad.class);
+        assertEquals(transformed.size(), 7);
         
         // run merge filter rule
         r = new MergeFilter("MergeFilter");


Reply via email to