Author: daijy Date: Mon Sep 6 05:38:53 2010 New Revision: 992943 URL: http://svn.apache.org/viewvc?rev=992943&view=rev Log: PIG-1575: Complete the migration of optimization rule PushUpFilter including missing test cases
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java Mon Sep 6 05:38:53 2010 @@ -161,6 +161,13 @@ public class DereferenceExpression exten LogicalExpression copy = new DereferenceExpression( lgExpPlan, columnsCopy); + + // Only one input is expected. + LogicalExpression input = (LogicalExpression) plan.getPredecessors( this ).get( 0 ); + LogicalExpression inputCopy = input.deepCopy( lgExpPlan ); + lgExpPlan.add( inputCopy ); + lgExpPlan.connect( inputCopy, copy ); + return copy; } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java Mon Sep 6 05:38:53 2010 @@ -94,4 +94,12 @@ public class LogicalExpressionPlan exten return sources; } + public LogicalExpressionPlan deepCopy() throws FrontendException { + LogicalExpressionPlan result = new LogicalExpressionPlan(); + LogicalExpression root = (LogicalExpression)getSources().get( 0 ); + LogicalExpression newRoot = root.deepCopy( result ); + result.add( newRoot ); + return result; + } + } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Mon Sep 6 05:38:53 2010 @@ -170,6 +170,18 @@ public class UserFuncExpression extends lgExpPlan, this.getFuncSpec().clone() ); copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator()); + + // Deep copy the input expressions. + List<Operator> inputs = plan.getPredecessors( this ); + if( inputs != null ) { + for( Operator op : inputs ) { + LogicalExpression input = (LogicalExpression)op; + LogicalExpression inputCopy = input.deepCopy( lgExpPlan ); + lgExpPlan.add( inputCopy ); + lgExpPlan.connect( inputCopy, copy ); + } + } + } catch(CloneNotSupportedException e) { e.printStackTrace(); } Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Mon Sep 6 05:38:53 2010 @@ -15,23 +15,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.pig.newplan.logical.rules; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.Pair; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; import org.apache.pig.newplan.OperatorSubPlan; +import org.apache.pig.newplan.logical.expression.LogicalExpression; import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan; import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.relational.LOCogroup; +import org.apache.pig.newplan.logical.relational.LOCross; +import org.apache.pig.newplan.logical.relational.LODistinct; import org.apache.pig.newplan.logical.relational.LOFilter; import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOJoin; +import org.apache.pig.newplan.logical.relational.LOLimit; +import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LONative; +import org.apache.pig.newplan.logical.relational.LOSort; +import org.apache.pig.newplan.logical.relational.LOSplit; +import org.apache.pig.newplan.logical.relational.LOSplitOutput; +import org.apache.pig.newplan.logical.relational.LOStore; +import org.apache.pig.newplan.logical.relational.LOStream; +import org.apache.pig.newplan.logical.relational.LOUnion; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; import org.apache.pig.newplan.logical.relational.LogicalSchema; @@ -50,116 +68,226 @@ public class PushUpFilter extends Rule { } public class PushUpFilterTransformer extends Transformer { - private OperatorSubPlan subPlan; @Override public boolean check(OperatorPlan matched) throws FrontendException { // check if it is inner join - LOJoin join = (LOJoin)matched.getSources().get(0); - boolean[] innerFlags = join.getInnerFlags(); - for(boolean inner: innerFlags) { - if (!inner){ - return false; - } + Operator current = matched.getSources().get(0); + + Operator pred = findNonFilterPredecessor( current ); + if( pred == null ) + return false; + + // sort, distinct, or sort by is always okay. + if( pred instanceof LOSort || pred instanceof LODistinct || pred instanceof LOUnion ) { + return true; } - - Operator next = matched.getSinks().get(0); - while(next != null && next instanceof LOFilter) { - LOFilter filter = (LOFilter)next; - LogicalExpressionPlan filterPlan = filter.getFilterPlan(); + + if( pred instanceof LOCogroup ) { + LOCogroup cogrp = (LOCogroup)pred; + if( currentPlan.getPredecessors( cogrp ).size() == 1 ) { + // Order by is always ok. + return true; + } - // collect all uids used in the filter plan - Set<Long> uids = new HashSet<Long>(); - Iterator<Operator> iter = filterPlan.getOperators(); - while(iter.hasNext()) { - Operator op = iter.next(); - if (op instanceof ProjectExpression) { - long uid = ((ProjectExpression)op).getFieldSchema().uid; - uids.add(uid); + if( 1 == cogrp.getExpressionPlans().get( 0 ).size() ) { + // Optimization is okay if there is only a single key. + return true; + } + } + + // if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative + // if predecessor is LOForEach, it is optimized by rule FilterAboveForeach + // return false + if( pred instanceof LOLoad || pred instanceof LOStore || pred instanceof LOStream || + pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput || + pred instanceof LOLimit || pred instanceof LONative || pred instanceof LOForEach) { + return false; + } + + + // if the predecessor is a multi-input operator then detailed + // checks are required + if( pred instanceof LOCross || pred instanceof LOJoin ) { + boolean[] innerFlags = null; + boolean isFullOuter = true; + boolean isInner = true; + if( pred instanceof LOJoin ) { + innerFlags = ((LOJoin)pred).getInnerFlags(); + // If all innerFlag is false, means a full outer join, + for (boolean inner : innerFlags) { + if (inner) { + isFullOuter = false; + } else { + isInner = false; + } } + if (isFullOuter) + return false; } - - List<Operator> preds = currentPlan.getPredecessors(join); - + + LOFilter filter = (LOFilter)current; + LogicalExpressionPlan filterPlan = filter.getFilterPlan(); + + // collect all uids used in the filter plan + Set<Long> uids = collectUidFromExpPlan(filterPlan); + + List<Operator> preds = currentPlan.getPredecessors(pred); + for(int j=0; j<preds.size(); j++) { - if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) { - return true; + if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) { + // For LOJoin, innerFlag==true indicate that branch is the outer join side + // which has the exact opposite semantics + if (pred instanceof LOCross || pred instanceof LOJoin && (isInner || innerFlags[j])) + return true; } } - - // if current filter can not move up, check next filter - List<Operator> l = currentPlan.getSuccessors(filter); - if (l != null) { - next = l.get(0); - } else { - next = null; - } } return false; } + + Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) throws FrontendException { + Set<Long> uids = new HashSet<Long>(); + Iterator<Operator> iter = filterPlan.getOperators(); + while(iter.hasNext()) { + Operator op = iter.next(); + if (op instanceof ProjectExpression) { + long uid = ((ProjectExpression)op).getFieldSchema().uid; + uids.add(uid); + } + } + return uids; + } + + /** + * Starting from current operator (which is a filter), search its successors until + * locating a non-filter operator. Null is returned if none is found. + */ + private Operator findNonFilterPredecessor(Operator current) { + Operator op = current; + do { + List<Operator> predecessors = currentPlan.getPredecessors( op ); + + // if there are no predecessors return false + if( predecessors == null || predecessors.size() == 0 ) { + return null; + } + + Operator pred = predecessors.get( 0 ); + if( pred instanceof LOFilter ) { + op = pred; + continue; + } else { + return pred; + } + } while( true ); + + } @Override public void transform(OperatorPlan matched) throws FrontendException { subPlan = new OperatorSubPlan(currentPlan); - LOJoin join = (LOJoin)matched.getSources().get(0); - subPlan.add(join); + LOFilter filter = (LOFilter)matched.getSources().get(0); + + // This is the one that we will insert filter btwn it and it's input. + Operator predecessor = this.findNonFilterPredecessor( filter ); + subPlan.add( predecessor) ; - Operator next = matched.getSinks().get(0); - while(next != null && next instanceof LOFilter) { - LOFilter filter = (LOFilter)next; - subPlan.add(filter); + // Disconnect the filter in the plan without removing it from the plan. + Operator predec = currentPlan.getPredecessors( filter ).get( 0 ); + Operator succed; + + if (currentPlan.getSuccessors(filter)!=null) + succed = currentPlan.getSuccessors(filter).get(0); + else + succed = null; + + Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter); + + if (succed!=null) { + subPlan.add(succed); + Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed); + currentPlan.connect(predec, p1.first, succed, p2.second); + } + + if( predecessor instanceof LOSort || predecessor instanceof LODistinct || + ( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) { + // For sort, put the filter in front of it. + Operator prev = currentPlan.getPredecessors( predecessor ).get( 0 ); - LogicalExpressionPlan filterPlan = filter.getFilterPlan(); + insertFilter( prev, predecessor, filter ); + return; + } + + // Find the predecessor of join that contains all required uids. + LogicalExpressionPlan filterPlan = filter.getFilterPlan(); + List<Operator> preds = currentPlan.getPredecessors( predecessor ); + Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, predecessor, preds ); + + LOFilter newFilter = null; + for( Entry<Integer, Operator> entry : inputs.entrySet() ) { + int inputIndex = entry.getKey(); + Operator pred = entry.getValue(); - // collect all uids used in the filter plan - Set<Long> uids = new HashSet<Long>(); - Iterator<Operator> iter = filterPlan.getOperators(); - while(iter.hasNext()) { - Operator op = iter.next(); - if (op instanceof ProjectExpression) { - long uid = ((ProjectExpression)op).getFieldSchema().uid; - uids.add(uid); + // Find projection field offset + int columnOffset = 0; + if( predecessor instanceof LOJoin || predecessor instanceof LOCross ) { + for( int i = 0; i < inputIndex; i++ ) { + columnOffset += ( (LogicalRelationalOperator)preds.get( i ) ).getSchema().size(); } } - // Find the predecessor of join that contains all required uids. - Operator input = null; - List<Operator> preds = currentPlan.getPredecessors(join); - for(int j=0; j<preds.size(); j++) { - if( hasAll((LogicalRelationalOperator)preds.get(j), uids) ) { - input = preds.get(j); - subPlan.add(input); - break; - } + // Reuse the filter for the first match. For others, need to make a copy of the filter + // and add it between input and predecessor. + newFilter = newFilter == null ? filter : new LOFilter( (LogicalPlan)currentPlan ); + + currentPlan.add( newFilter ); + subPlan.add( newFilter ); + subPlan.add( pred ); + LogicalExpressionPlan fPlan = filterPlan.deepCopy(); + List<Operator> sinks = fPlan.getSinks(); + List<ProjectExpression> projExprs = new ArrayList<ProjectExpression>(); + for( Operator sink : sinks ) { + if( sink instanceof ProjectExpression ) + projExprs.add( (ProjectExpression)sink ); } - - if( input != null ) { - // Found one of the join's predeccessors of the join which has all the uids. - Operator pred = currentPlan.getPredecessors(filter).get(0); - Operator succed = currentPlan.getSuccessors(filter).get(0); - subPlan.add(succed); - - Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter); - Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed); - currentPlan.connect(pred, p1.first, succed, p2.second); - - succed = currentPlan.getSuccessors(input).get(0); - Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed); - currentPlan.connect(input, p3.first, filter, 0); - currentPlan.connect(filter, 0, succed, p3.second); - return; - } else { - // Didn't find the opeartor, so looking at the next one after the filter. - List<Operator> l = currentPlan.getSuccessors(filter ); - if( l != null ) { - next = l.get( 0 ); - } else { - next = null; + + if( predecessor instanceof LOCogroup ) { + for( ProjectExpression projExpr : projExprs ) { + // Need to merge filter condition and cogroup by expression; + LogicalExpressionPlan plan = ((LOCogroup) predecessor).getExpressionPlans().get( inputIndex ).iterator().next(); + LogicalExpressionPlan copy = plan.deepCopy(); + LogicalExpression root = (LogicalExpression)copy.getSinks().get( 0 ); + List<Operator> predecessors = fPlan.getPredecessors( projExpr ); + if( predecessors == null || predecessors.size() == 0 ) { + fPlan.remove( projExpr ); + fPlan.add( root ); + } else { + fPlan.add( root ); + Operator pred1 = predecessors.get( 0 ); + Pair<Integer, Integer> pair = fPlan.disconnect( pred1, projExpr ); + fPlan.connect( pred1, pair.first, root, pair.second ); + fPlan.remove( projExpr ); + } } } + + // Now, reset the projection expressions in the new filter plan. + sinks = fPlan.getSinks(); + for( Operator sink : sinks ) { + if( sink instanceof ProjectExpression ) { + ProjectExpression projE = (ProjectExpression)sink; + projE.setAttachedRelationalOp( newFilter ); + projE.setInputNum( 0 ); + projE.setColNum( projE.getColNum() - columnOffset ); + } + } + newFilter.setFilterPlan( fPlan ); + + insertFilter( pred, predecessor, newFilter ); } } @@ -177,23 +305,65 @@ public class PushUpFilter extends Rule { @Override public OperatorPlan reportChanges() { - return subPlan; + return currentPlan; } + // Insert the filter in between the given two operators. + private void insertFilter(Operator prev, Operator predecessor, LOFilter filter) + throws FrontendException { + Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor ); + currentPlan.connect( prev, p3.first, filter, 0 ); + currentPlan.connect( filter, 0, predecessor, p3.second ); + } + + // Identify those among preds that will need to have a filter between it and the predecessor. + private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor, + List<Operator> preds) throws FrontendException { + Map<Integer, Operator> inputs = new HashMap<Integer, Operator>(); + + if( predecessor instanceof LOUnion || predecessor instanceof LOCogroup ) { + for( int i = 0; i < preds.size(); i++ ) { + inputs.put( i, preds.get( i ) ); + } + return inputs; + } + + // collect all uids used in the filter plan + Set<Long> uids = collectUidFromExpPlan(filterPlan); + boolean[] innerFlags = null; + boolean isInner = true; + if (predecessor instanceof LOJoin) { + innerFlags = ((LOJoin)predecessor).getInnerFlags(); + for (boolean inner : innerFlags) { + if (!inner) { + isInner = false; + break; + } + } + } + + // Find the predecessor of join that contains all required uids. + for(int j=0; j<preds.size(); j++) { + // Filter can push to LOJoin outer branch, but no inner branch + if( hasAll((LogicalRelationalOperator)preds.get(j), uids) && + (predecessor instanceof LOCross || predecessor instanceof LOJoin && (isInner || innerFlags[j]))) { + Operator input = preds.get(j); + subPlan.add(input); + inputs.put( j, input ); + } + } + return inputs; + } } @Override protected OperatorPlan buildPattern() { - // the pattern that this rule looks for - // is join -> filter LogicalPlan plan = new LogicalPlan(); - LogicalRelationalOperator op1 = new LOJoin(plan); - LogicalRelationalOperator op2 = new LOFilter(plan); - plan.add(op1); - plan.add(op2); - plan.connect(op1, op2); + LogicalRelationalOperator op1 = new LOFilter(plan); + plan.add( op1 ); return plan; } + } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Mon Sep 6 05:38:53 2010 @@ -35,6 +35,7 @@ import org.apache.pig.newplan.logical.op import org.apache.pig.newplan.logical.relational.LOFilter; import org.apache.pig.newplan.logical.relational.LOForEach; import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LOStore; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.rules.FilterAboveForeach; import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; @@ -263,7 +264,141 @@ public class TestNewPlanFilterAboveForea Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 ); Assert.assertTrue( filter instanceof LOFilter ); } - + + @Test + public void testFilterForeach() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, $2;"); + planTester.buildPlan("C = filter B by $0 < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + @Test + public void testFilterForeachAddedField() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, $2, COUNT({(1)});"); + planTester.buildPlan("C = filter B by $2 < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + @Test + public void testFilterForeachCast() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate (int)$1, $2;"); + planTester.buildPlan("C = filter B by $0 < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + @Test + public void testFilterCastForeach() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, $2;"); + planTester.buildPlan("C = filter B by (int)$0 < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + + @Test + public void testFilterConstantConditionForeach() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, $2;"); + planTester.buildPlan("C = filter B by 1 == 1;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + @Test + public void testFilterUDFForeach() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, $2;"); + planTester.buildPlan("C = filter B by " + Identity.class.getName() + "($1) ;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + + @Test + public void testFilterForeachFlatten() throws Exception { + planTester.buildPlan("A = load 'myfile' as (name, age, gpa);"); + planTester.buildPlan("B = foreach A generate $1, flatten($2);"); + planTester.buildPlan("C = filter B by $0 < 18;"); + org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "D = STORE C INTO 'empty';" ); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan ); + + Operator load = newLogicalPlan.getSources().get( 0 ); + Assert.assertTrue( load instanceof LOLoad ); + Operator filter = newLogicalPlan.getSuccessors( load ).get( 0 ); + Assert.assertTrue( filter instanceof LOFilter ); + Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 ); + Assert.assertTrue( fe instanceof LOForEach ); + Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 ); + Assert.assertTrue( store instanceof LOStore ); + } + private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws FrontendException { LogicalPlan newLogicalPlan = migratePlan( plan ); PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=992943&r1=992942&r2=992943&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Mon Sep 6 05:38:53 2010 @@ -325,9 +325,7 @@ public class TestNewPlanFilterRule exten assertEquals(next.getClass(), LOStore.class); transformed = listener.getTransformed(); - assertEquals(transformed.size(), 4); - assertEquals(transformed.getSinks().get(0).getClass(), LOFilter.class); - assertEquals(transformed.getSources().get(0).getClass(), LOLoad.class); + assertEquals(transformed.size(), 7); // run merge filter rule r = new MergeFilter("MergeFilter");