Author: daijy Date: Fri Sep 24 23:46:10 2010 New Revision: 1001111 URL: http://svn.apache.org/viewvc?rev=1001111&view=rev Log: PIG-1639: New logical plan: PushUpFilter should not push before group/cogroup if filter condition contains UDF
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanFilterRule.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1001111&r1=1001110&r2=1001111&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Fri Sep 24 23:46:10 2010 @@ -198,6 +198,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1639: New logical plan: PushUpFilter should not push before group/cogroup +if filter condition contains UDF (xuefuz via daijy) + PIG-1643: join fails for a query with input having 'load using pigstorage without schema' + 'foreach' (thejas) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1001111&r1=1001110&r2=1001111&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Fri Sep 24 23:46:10 2010 @@ -35,6 +35,7 @@ import org.apache.pig.newplan.OperatorSu import org.apache.pig.newplan.logical.expression.LogicalExpression; import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; import org.apache.pig.newplan.logical.relational.LOCogroup; import org.apache.pig.newplan.logical.relational.LOCross; import org.apache.pig.newplan.logical.relational.LODistinct; @@ -84,19 +85,6 @@ public class PushUpFilter extends Rule { return true; } - if( pred instanceof LOCogroup ) { - LOCogroup cogrp = (LOCogroup)pred; - if( currentPlan.getPredecessors( cogrp ).size() == 1 ) { - // Order by is always ok. - return true; - } - - if( 1 == cogrp.getExpressionPlans().get( 0 ).size() ) { - // Optimization is okay if there is only a single key. - return true; - } - } - // if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative // if predecessor is LOForEach, it is optimized by rule FilterAboveForeach // return false @@ -106,6 +94,27 @@ public class PushUpFilter extends Rule { return false; } + LOFilter filter = (LOFilter)current; + List<Operator> preds = currentPlan.getPredecessors( pred ); + LogicalExpressionPlan filterPlan = filter.getFilterPlan(); + + // collect all uids used in the filter plan + Set<Long> uids = collectUidFromExpPlan(filterPlan); + + if( pred instanceof LOCogroup ) { + LOCogroup cogrp = (LOCogroup)pred; + if( preds.size() == 1 ) { + if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), uids ) ) { + // Order by is ok if all UIDs can be found from previous operator. + return true; + } + } else if ( 1 == cogrp.getExpressionPlans().get( 0 ).size() && !containUDF( filterPlan ) ) { + // Optimization is possible if there is only a single key. + // For regular cogroup, we cannot use UIDs to determine if filter can be pushed up. + // But if there is no UDF, it's okay, as only UDF can take bag field as input. + return true; + } + } // if the predecessor is a multi-input operator then detailed // checks are required @@ -127,13 +136,6 @@ public class PushUpFilter extends Rule { return false; } - LOFilter filter = (LOFilter)current; - LogicalExpressionPlan filterPlan = filter.getFilterPlan(); - - // collect all uids used in the filter plan - Set<Long> uids = collectUidFromExpPlan(filterPlan); - - List<Operator> preds = currentPlan.getPredecessors(pred); for(int j=0; j<preds.size(); j++) { if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) { @@ -148,6 +150,15 @@ public class PushUpFilter extends Rule { return false; } + private boolean containUDF(LogicalExpressionPlan filterPlan) { + Iterator<Operator> it = filterPlan.getOperators(); + while( it.hasNext() ) { + if( it.next() instanceof UserFuncExpression ) + return true; + } + return false; + } + Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) throws FrontendException { Set<Long> uids = new HashSet<Long>(); Iterator<Operator> iter = filterPlan.getOperators(); Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanFilterRule.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1001111&r1=1001110&r2=1001111&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanFilterRule.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanFilterRule.java Fri Sep 24 23:46:10 2010 @@ -19,31 +19,43 @@ package org.apache.pig.test; import java.util.*; + +import org.apache.pig.ExecType; import org.apache.pig.data.DataType; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.MultiMap; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor; import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer; import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher; import org.apache.pig.newplan.logical.optimizer.SchemaPatcher; +import org.apache.pig.newplan.logical.relational.LOCogroup; import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.pig.newplan.logical.relational.LOLoad; import org.apache.pig.newplan.logical.relational.LOStore; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; import org.apache.pig.newplan.logical.rules.MergeFilter; import org.apache.pig.newplan.logical.rules.PushUpFilter; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.optimizer.PlanOptimizer; import org.apache.pig.newplan.optimizer.PlanTransformListener; import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.test.utils.LogicalPlanTester; +import org.junit.Assert; +import org.junit.Test; -import junit.framework.TestCase; - -public class TestNewPlanFilterRule extends TestCase { +public class TestNewPlanFilterRule { + PigContext pc = new PigContext(ExecType.LOCAL, new Properties()); + LogicalPlanTester planTester = new LogicalPlanTester(pc) ; LogicalPlan plan = null; LogicalRelationalOperator load1 = null; @@ -124,6 +136,7 @@ public class TestNewPlanFilterRule exten load2 = l2; } + @Test public void testFilterRule() throws Exception { prep(); // run split filter rule @@ -135,8 +148,8 @@ public class TestNewPlanFilterRule exten MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); optimizer.optimize(); - assertEquals(plan.getPredecessors(filter).get(0), join); - assertEquals(plan.getSuccessors(filter).get(0), store); + Assert.assertEquals(plan.getPredecessors(filter).get(0), join); + Assert.assertEquals(plan.getSuccessors(filter).get(0), store); // run push up filter rule r = new PushUpFilter("PushUpFilter"); @@ -148,9 +161,9 @@ public class TestNewPlanFilterRule exten optimizer.optimize(); // the filter should be moved up to be after load - assertEquals(plan.getSuccessors(load1).get(0), filter); - assertEquals(plan.getSuccessors(filter).get(0), join); - assertEquals(plan.getSuccessors(join).get(0), store); + Assert.assertEquals(plan.getSuccessors(load1).get(0), filter); + Assert.assertEquals(plan.getSuccessors(filter).get(0), join); + Assert.assertEquals(plan.getSuccessors(join).get(0), store); // run merge filter rule r = new MergeFilter("MergeFilter"); @@ -162,12 +175,13 @@ public class TestNewPlanFilterRule exten optimizer.optimize(); // the filter should the same as before, nothing to merge - assertEquals(plan.getSuccessors(load1).get(0), filter); - assertEquals(plan.getSuccessors(filter).get(0), join); - assertEquals(plan.getSuccessors(join).get(0), store); + Assert.assertEquals(plan.getSuccessors(load1).get(0), filter); + Assert.assertEquals(plan.getSuccessors(filter).get(0), join); + Assert.assertEquals(plan.getSuccessors(join).get(0), store); } // build an expression with 1 AND, it should split into 2 filters + @Test public void testFilterRuleWithAnd() throws Exception { prep(); @@ -199,11 +213,11 @@ public class TestNewPlanFilterRule exten PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); optimizer.optimize(); - assertEquals(plan.getPredecessors(filter).get(0), join); + Assert.assertEquals(plan.getPredecessors(filter).get(0), join); Operator next = plan.getSuccessors(filter).get(0); - assertEquals(LOFilter.class, next.getClass()); + Assert.assertEquals(LOFilter.class, next.getClass()); next = plan.getSuccessors(next).get(0); - assertEquals(LOStore.class, next.getClass()); + Assert.assertEquals(LOStore.class, next.getClass()); // run push up filter rule r = new PushUpFilter("PushUpFilter"); @@ -216,14 +230,14 @@ public class TestNewPlanFilterRule exten // both filters should be moved up to be after each load next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); - assertEquals(plan.getSuccessors(join).get(0), store); + Assert.assertEquals(plan.getSuccessors(join).get(0), store); // run merge filter rule r = new MergeFilter("MergeFilter"); @@ -236,16 +250,17 @@ public class TestNewPlanFilterRule exten // the filters should the same as before, nothing to merge next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); - assertEquals(plan.getSuccessors(join).get(0), store); + Assert.assertEquals(plan.getSuccessors(join).get(0), store); } + @Test public void testFilterRuleWith2And() throws Exception { prep(); // build an expression with 2 AND, it should split into 3 filters @@ -287,16 +302,16 @@ public class TestNewPlanFilterRule exten optimizer.addPlanTransformListener(listener); optimizer.optimize(); - assertEquals(plan.getPredecessors(filter).get(0), join); + Assert.assertEquals(plan.getPredecessors(filter).get(0), join); Operator next = plan.getSuccessors(filter).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(LOStore.class, next.getClass()); + Assert.assertEquals(LOStore.class, next.getClass()); OperatorPlan transformed = listener.getTransformed(); - assertEquals(transformed.size(), 3); + Assert.assertEquals(transformed.size(), 3); // run push up filter rule r = new PushUpFilter("PushUpFilter"); @@ -311,21 +326,21 @@ public class TestNewPlanFilterRule exten // 2 filters should be moved up to be after each load, and one filter should remain next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(join).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOStore.class); + Assert.assertEquals(next.getClass(), LOStore.class); transformed = listener.getTransformed(); - assertEquals(transformed.size(), 7); + Assert.assertEquals(transformed.size(), 7); // run merge filter rule r = new MergeFilter("MergeFilter"); @@ -340,23 +355,24 @@ public class TestNewPlanFilterRule exten // the filters should the same as before, nothing to merge next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(join).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOStore.class); + Assert.assertEquals(next.getClass(), LOStore.class); transformed = listener.getTransformed(); - assertNull(transformed); + Assert.assertNull(transformed); } + @Test public void testFilterRuleWith2And2() throws Exception { prep(); // build an expression with 2 AND, it should split into 3 filters @@ -396,13 +412,13 @@ public class TestNewPlanFilterRule exten MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); optimizer.optimize(); - assertEquals(plan.getPredecessors(filter).get(0), join); + Assert.assertEquals(plan.getPredecessors(filter).get(0), join); Operator next = plan.getSuccessors(filter).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(LOStore.class, next.getClass()); + Assert.assertEquals(LOStore.class, next.getClass()); // run push up filter rule r = new PushUpFilter("PushUpFilter"); @@ -415,20 +431,20 @@ public class TestNewPlanFilterRule exten // 1 filter should be moved up to be after a load, and 2 filters should remain next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next, join); + Assert.assertEquals(next, join); next = plan.getSuccessors(join).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOStore.class); + Assert.assertEquals(next.getClass(), LOStore.class); // run merge filter rule r = new MergeFilter("MergeFilter"); @@ -443,27 +459,88 @@ public class TestNewPlanFilterRule exten // the 2 filters after join should merge next = plan.getSuccessors(load1).get(0); - assertEquals(next.getClass(), LOFilter.class); - assertEquals(plan.getSuccessors(next).get(0), join); + Assert.assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(plan.getSuccessors(next).get(0), join); next = plan.getSuccessors(load2).get(0); - assertEquals(next, join); + Assert.assertEquals(next, join); next = plan.getSuccessors(join).get(0); - assertEquals(next.getClass(), LOFilter.class); + Assert.assertEquals(next.getClass(), LOFilter.class); next = plan.getSuccessors(next).get(0); - assertEquals(next.getClass(), LOStore.class); + Assert.assertEquals(next.getClass(), LOStore.class); OperatorPlan transformed = listener.getTransformed(); - assertEquals(transformed.size(), 2); + Assert.assertEquals(transformed.size(), 2); } + // See pig-1639 + @Test + public void testFilterUDFNegative() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = group A by age;"); + planTester.buildPlan("C = filter B by COUNT(A) < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator group = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( group instanceof LOCogroup ); + Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws FrontendException { + LogicalPlan newLogicalPlan = migratePlan( plan ); + PlanOptimizer optimizer = new NewPlanOptimizer( newLogicalPlan, 3 ); + optimizer.optimize(); + return newLogicalPlan; + } + + private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{ + LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp); + visitor.visit(); + org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan(); + return newPlan; + } + + public class NewPlanOptimizer extends LogicalPlanOptimizer { + protected NewPlanOptimizer(OperatorPlan p, int iterations) { + super(p, iterations, new HashSet<String>()); + } + + public void addPlanTransformListener(PlanTransformListener listener) { + super.addPlanTransformListener(listener); + } + + protected List<Set<Rule>> buildRuleSets() { + List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); + + Set<Rule> s = new HashSet<Rule>(); + // add split filter rule + Rule r = new LoadTypeCastInserter( "TypeCastInserter" ); + s.add(r); + ls.add(s); + + s = new HashSet<Rule>(); + r = new PushUpFilter( "PushUpFilter" ); + s.add(r); + ls.add(s); + + return ls; + } + } + public class MyPlanOptimizer extends PlanOptimizer { protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs, int iterations) { - super(p, rs, iterations); + super(p, rs, iterations); addPlanTransformListener(new SchemaPatcher()); addPlanTransformListener(new ProjectionPatcher()); }