Author: daijy
Date: Fri Sep 24 23:45:04 2010
New Revision: 1001110

URL: http://svn.apache.org/viewvc?rev=1001110&view=rev
Log:
PIG-1639: New logical plan: PushUpFilter should not push before group/cogroup 
if filter condition contains UDF

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=1001110&r1=1001109&r2=1001110&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Sep 24 23:45:04 2010
@@ -207,6 +207,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1639: New logical plan: PushUpFilter should not push before group/cogroup 
+if filter condition contains UDF (xuefuz via daijy)
+
 PIG-1643: join fails for a query with input having 'load using pigstorage 
 without schema' + 'foreach' (thejas)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1001110&r1=1001109&r2=1001110&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java 
Fri Sep 24 23:45:04 2010
@@ -35,6 +35,7 @@ import org.apache.pig.newplan.OperatorSu
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LODistinct;
@@ -84,19 +85,6 @@ public class PushUpFilter extends Rule {
                 return true;
             }
             
-            if( pred instanceof LOCogroup ) {
-                LOCogroup cogrp = (LOCogroup)pred;
-                if( currentPlan.getPredecessors( cogrp ).size() == 1 ) {
-                    // Order by is always ok.
-                    return true;
-                }
-                
-                if( 1 == cogrp.getExpressionPlans().get( 0 ).size() ) {
-                    // Optimization is okay if there is only a single key.
-                    return true;
-                }
-            }
-            
             // if the predecessor is one of 
LOLoad/LOStore/LOStream/LOLimit/LONative
             // if predecessor is LOForEach, it is optimized by rule 
FilterAboveForeach
             // return false
@@ -106,6 +94,27 @@ public class PushUpFilter extends Rule {
                 return false;
             }
             
+            LOFilter filter = (LOFilter)current;            
+            List<Operator> preds = currentPlan.getPredecessors( pred );
+            LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                
+            // collect all uids used in the filter plan
+            Set<Long> uids = collectUidFromExpPlan(filterPlan);
+                                
+            if( pred instanceof LOCogroup ) {
+                LOCogroup cogrp = (LOCogroup)pred;
+                if( preds.size() == 1 ) { 
+                    if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), 
uids )    ) {
+                        // Order by is ok if all UIDs can be found from 
previous operator.
+                        return true;
+                    }
+                } else if ( 1 == cogrp.getExpressionPlans().get( 0 ).size() && 
!containUDF( filterPlan ) ) {
+                    // Optimization is possible if there is only a single key.
+                    // For regular cogroup, we cannot use UIDs to determine if 
filter can be pushed up.
+                    // But if there is no UDF, it's okay, as only UDF can take 
bag field as input.
+                    return true;
+                }
+            }
             
             // if the predecessor is a multi-input operator then detailed
             // checks are required
@@ -127,13 +136,6 @@ public class PushUpFilter extends Rule {
                         return false;
                 }
                 
-                LOFilter filter = (LOFilter)current;            
-                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
-                    
-                // collect all uids used in the filter plan
-                Set<Long> uids = collectUidFromExpPlan(filterPlan);
-                                    
-                List<Operator> preds = currentPlan.getPredecessors(pred);
 
                 for(int j=0; j<preds.size(); j++) {
                     if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) 
{
@@ -148,6 +150,15 @@ public class PushUpFilter extends Rule {
             return false;
         }
         
+        private boolean containUDF(LogicalExpressionPlan filterPlan) {
+            Iterator<Operator> it = filterPlan.getOperators();
+            while( it.hasNext() ) {
+                if( it.next() instanceof UserFuncExpression )
+                    return true;
+            }
+            return false;
+        }
+
         Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) 
throws FrontendException {
             Set<Long> uids = new HashSet<Long>();
             Iterator<Operator> iter = filterPlan.getOperators();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1001110&r1=1001109&r2=1001110&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Fri 
Sep 24 23:45:04 2010
@@ -19,31 +19,43 @@
 package org.apache.pig.test;
 
 import java.util.*;
+
+import org.apache.pig.ExecType;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.expression.*;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
 import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.PlanTransformListener;
 import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Assert;
+import org.junit.Test;
 
-import junit.framework.TestCase;
-
-public class TestNewPlanFilterRule extends TestCase {
+public class TestNewPlanFilterRule {
+    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
 
     LogicalPlan plan = null;
     LogicalRelationalOperator load1 = null;
@@ -124,6 +136,7 @@ public class TestNewPlanFilterRule exten
         load2 = l2;
     }
     
+    @Test
     public void testFilterRule() throws Exception  {
         prep();
         // run split filter rule
@@ -135,8 +148,8 @@ public class TestNewPlanFilterRule exten
         MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
         optimizer.optimize();
         
-        assertEquals(plan.getPredecessors(filter).get(0), join);
-        assertEquals(plan.getSuccessors(filter).get(0), store);
+        Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
+        Assert.assertEquals(plan.getSuccessors(filter).get(0), store);
         
         // run push up filter rule
         r = new PushUpFilter("PushUpFilter");
@@ -148,9 +161,9 @@ public class TestNewPlanFilterRule exten
         optimizer.optimize();
         
         // the filter should be moved up to be after load
-        assertEquals(plan.getSuccessors(load1).get(0), filter);
-        assertEquals(plan.getSuccessors(filter).get(0), join);
-        assertEquals(plan.getSuccessors(join).get(0), store);
+        Assert.assertEquals(plan.getSuccessors(load1).get(0), filter);
+        Assert.assertEquals(plan.getSuccessors(filter).get(0), join);
+        Assert.assertEquals(plan.getSuccessors(join).get(0), store);
         
         // run merge filter rule
         r = new MergeFilter("MergeFilter");
@@ -162,12 +175,13 @@ public class TestNewPlanFilterRule exten
         optimizer.optimize();
         
         // the filter should the same as before, nothing to merge
-        assertEquals(plan.getSuccessors(load1).get(0), filter);
-        assertEquals(plan.getSuccessors(filter).get(0), join);
-        assertEquals(plan.getSuccessors(join).get(0), store);
+        Assert.assertEquals(plan.getSuccessors(load1).get(0), filter);
+        Assert.assertEquals(plan.getSuccessors(filter).get(0), join);
+        Assert.assertEquals(plan.getSuccessors(join).get(0), store);
     }
         
     // build an expression with 1 AND, it should split into 2 filters
+    @Test
     public void testFilterRuleWithAnd() throws Exception  {
         prep();
         
@@ -199,11 +213,11 @@ public class TestNewPlanFilterRule exten
         PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
         optimizer.optimize();
         
-        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
         Operator next = plan.getSuccessors(filter).get(0);
-        assertEquals(LOFilter.class, next.getClass());        
+        Assert.assertEquals(LOFilter.class, next.getClass());        
         next = plan.getSuccessors(next).get(0);
-        assertEquals(LOStore.class, next.getClass());
+        Assert.assertEquals(LOStore.class, next.getClass());
         
         // run push up filter rule
         r = new PushUpFilter("PushUpFilter");
@@ -216,14 +230,14 @@ public class TestNewPlanFilterRule exten
         
         // both filters should be moved up to be after each load
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
-        assertEquals(plan.getSuccessors(join).get(0), store);
+        Assert.assertEquals(plan.getSuccessors(join).get(0), store);
         
         // run merge filter rule
         r = new MergeFilter("MergeFilter");
@@ -236,16 +250,17 @@ public class TestNewPlanFilterRule exten
         
         // the filters should the same as before, nothing to merge
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
-        assertEquals(plan.getSuccessors(join).get(0), store);
+        Assert.assertEquals(plan.getSuccessors(join).get(0), store);
     }
     
+    @Test
     public void testFilterRuleWith2And() throws Exception  {
         prep();
         // build an expression with 2 AND, it should split into 3 filters
@@ -287,16 +302,16 @@ public class TestNewPlanFilterRule exten
         optimizer.addPlanTransformListener(listener);
         optimizer.optimize();
         
-        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
         Operator next = plan.getSuccessors(filter).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         next = plan.getSuccessors(next).get(0);
-        assertEquals(LOStore.class, next.getClass());
+        Assert.assertEquals(LOStore.class, next.getClass());
         
         OperatorPlan transformed = listener.getTransformed();
-        assertEquals(transformed.size(), 3);
+        Assert.assertEquals(transformed.size(), 3);
         
         // run push up filter rule
         r = new PushUpFilter("PushUpFilter");
@@ -311,21 +326,21 @@ public class TestNewPlanFilterRule exten
         
         // 2 filters should be moved up to be after each load, and one filter 
should remain
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(join).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOStore.class);
+        Assert.assertEquals(next.getClass(), LOStore.class);
         
         transformed = listener.getTransformed();
-        assertEquals(transformed.size(), 7);
+        Assert.assertEquals(transformed.size(), 7);
         
         // run merge filter rule
         r = new MergeFilter("MergeFilter");
@@ -340,23 +355,24 @@ public class TestNewPlanFilterRule exten
         
         // the filters should the same as before, nothing to merge
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(join).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOStore.class);
+        Assert.assertEquals(next.getClass(), LOStore.class);
         
         transformed = listener.getTransformed();
-        assertNull(transformed);
+        Assert.assertNull(transformed);
     }   
     
+    @Test
     public void testFilterRuleWith2And2() throws Exception  {
         prep();
         // build an expression with 2 AND, it should split into 3 filters
@@ -396,13 +412,13 @@ public class TestNewPlanFilterRule exten
         MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
         optimizer.optimize();
         
-        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
         Operator next = plan.getSuccessors(filter).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         next = plan.getSuccessors(next).get(0);
-        assertEquals(LOStore.class, next.getClass());
+        Assert.assertEquals(LOStore.class, next.getClass());
         
         // run push up filter rule
         r = new PushUpFilter("PushUpFilter");
@@ -415,20 +431,20 @@ public class TestNewPlanFilterRule exten
         
         // 1 filter should be moved up to be after a load, and 2 filters 
should remain
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next, join);     
+        Assert.assertEquals(next, join);     
         
         next = plan.getSuccessors(join).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
                 
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOStore.class);
+        Assert.assertEquals(next.getClass(), LOStore.class);
         
         // run merge filter rule
         r = new MergeFilter("MergeFilter");
@@ -443,27 +459,88 @@ public class TestNewPlanFilterRule exten
         
         // the 2 filters after join should merge
         next = plan.getSuccessors(load1).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
-        assertEquals(plan.getSuccessors(next).get(0), join);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(plan.getSuccessors(next).get(0), join);
         
         next = plan.getSuccessors(load2).get(0);
-        assertEquals(next, join);        
+        Assert.assertEquals(next, join);        
         
         next = plan.getSuccessors(join).get(0);
-        assertEquals(next.getClass(), LOFilter.class);
+        Assert.assertEquals(next.getClass(), LOFilter.class);
         
         next = plan.getSuccessors(next).get(0);
-        assertEquals(next.getClass(), LOStore.class);
+        Assert.assertEquals(next.getClass(), LOStore.class);
         
         OperatorPlan transformed = listener.getTransformed();
-        assertEquals(transformed.size(), 2);
+        Assert.assertEquals(transformed.size(), 2);
     }   
     
+    // See pig-1639
+    @Test
+    public void testFilterUDFNegative() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = group A by age;");        
+        planTester.buildPlan("C = filter B by COUNT(A) < 18;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = 
planTester.buildPlan( "D = STORE C INTO 'empty';" ); 
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator group = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( group instanceof LOCogroup );
+        Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
+        Assert.assertTrue( store instanceof LOStore );
+    }
+
+    private LogicalPlan 
migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) 
throws FrontendException {
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        PlanOptimizer optimizer = new NewPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        return newLogicalPlan;
+    }
+    
+    private LogicalPlan 
migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws 
VisitorException{
+        LogicalPlanMigrationVistor visitor = new 
LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = 
visitor.getNewLogicalPlan();
+        return newPlan;
+    }
+
+    public class NewPlanOptimizer extends LogicalPlanOptimizer {
+        protected NewPlanOptimizer(OperatorPlan p,  int iterations) {
+            super(p, iterations, new HashSet<String>());
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+       protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Set<Rule> s = new HashSet<Rule>();
+            // add split filter rule
+            Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
+            s.add(r);
+            ls.add(s);
+             
+            s = new HashSet<Rule>();
+            r = new PushUpFilter( "PushUpFilter" );
+            s.add(r);            
+            ls.add(s);
+            
+            return ls;
+        }
+    }    
+
     public class MyPlanOptimizer extends PlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
                 int iterations) {
-            super(p, rs, iterations);                  
+            super(p, rs, iterations);            
             addPlanTransformListener(new SchemaPatcher());
             addPlanTransformListener(new ProjectionPatcher());
         }


Reply via email to