Author: daijy Date: Wed Aug 25 17:42:06 2010 New Revision: 989251 URL: http://svn.apache.org/viewvc?rev=989251&view=rev Log: PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=989251&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Wed Aug 25 17:42:06 2010 @@ -0,0 +1,520 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.Expression; +import org.apache.pig.PigException; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; + +import org.apache.pig.Expression.OpType; +import org.apache.pig.newplan.logical.expression.AddExpression; +import org.apache.pig.newplan.logical.expression.AndExpression; +import org.apache.pig.newplan.logical.expression.BinCondExpression; +import org.apache.pig.newplan.logical.expression.BinaryExpression; +import org.apache.pig.newplan.logical.expression.CastExpression; +import org.apache.pig.newplan.logical.expression.ConstantExpression; +import org.apache.pig.newplan.logical.expression.DivideExpression; +import org.apache.pig.newplan.logical.expression.EqualExpression; +import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression; +import org.apache.pig.newplan.logical.expression.GreaterThanExpression; +import org.apache.pig.newplan.logical.expression.IsNullExpression; +import org.apache.pig.newplan.logical.expression.LessThanEqualExpression; +import org.apache.pig.newplan.logical.expression.LessThanExpression; +import org.apache.pig.newplan.logical.expression.LogicalExpression; +import org.apache.pig.newplan.logical.expression.ModExpression; +import org.apache.pig.newplan.logical.expression.MultiplyExpression; +import org.apache.pig.newplan.logical.expression.NotEqualExpression; +import org.apache.pig.newplan.logical.expression.NotExpression; +import org.apache.pig.newplan.logical.expression.OrExpression; +import org.apache.pig.newplan.logical.expression.ProjectExpression; +import org.apache.pig.newplan.logical.expression.RegexExpression; +import org.apache.pig.newplan.logical.expression.SubtractExpression; +import org.apache.pig.newplan.logical.expression.UserFuncExpression; +import org.apache.pig.newplan.DepthFirstWalker; + +/** + * This Visitor works on the filter condition of a LOFilter which immediately + * follows a LOLoad that interacts with a metadata system (currently OWL) to + * read table data. The visitor looks for conditions on partition columns in the + * filter condition and extracts those conditions out of the filter condition. + * The condition on partition cols will be used to prune partitions of the table. + * + */ +public class PColFilterExtractor extends PlanVisitor { + /** + * partition columns associated with the table + * present in the load on which the filter whose + * inner plan is being visited is applied + */ + private List<String> partitionCols; + + /** + * will contain the partition column filter conditions + * accumulated during the visit - the final condition will an expression + * built from these sub expressions connected with AND + */ + private ArrayList<Expression> pColConditions = new ArrayList<Expression>(); + + /** + * flag used during visit to indicate if a partition key + * was seen + */ + private boolean sawKey; + + private boolean sawNonKeyCol; + + private enum Side { LEFT, RIGHT, NONE }; + private Side replaceSide = Side.NONE; + + private boolean filterRemovable = false; + + @Override + public void visit() throws FrontendException { + // we will visit the leaf and it will recursively walk the plan + LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 ); + // if the leaf is a unary operator it should be a FilterFunc in + // which case we don't try to extract partition filter conditions + if(leaf instanceof BinaryExpression) { + BinaryExpression binExpr = (BinaryExpression)leaf; + visit( binExpr ); + replaceChild( binExpr ); + // if the entire expression is to be removed, then the above + // replaceChild will not set sawKey to false (sawKey is set to + // false only in replaceChild() + if(sawKey == true) { + //there are only conditions on partition columns in the filter + //extract it + pColConditions.add( getExpression( leaf ) ); + filterRemovable = true; + } + } + } + + /** + * + * @param plan logical plan corresponding the filter's comparison condition + * @param partitionCols list of partition columns of the table which is + * being loaded in the LOAD statement which is input to the filter + */ + public PColFilterExtractor(OperatorPlan plan, + List<String> partitionCols) { + // though we configure a DepthFirstWalker to be the walker, we will not + // use it - we will visit the leaf and it will recursively walk the + // plan + super( plan, new DepthFirstWalker( plan ) ); + this.partitionCols = new ArrayList<String>(partitionCols); + } + + protected void visit(ProjectExpression project) throws FrontendException { + String fieldName = project.getFieldSchema().alias; + if(partitionCols.contains(fieldName)) { + sawKey = true; + // The condition on partition column will be used to prune the + // scan and removed from the filter condition. Hence the condition + // on the partition column will not be re applied when data is read, + // so the following cases should throw error until that changes. + List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>(); + opsToCheckFor.add(RegexExpression.class); + int errCode = 1110; + if(checkSuccessors(project, opsToCheckFor)) { + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + + fieldName + ") inside a regexp operator in the " + + "filter condition.", errCode, PigException.INPUT); + } + opsToCheckFor.set(0, UserFuncExpression.class); + if(checkSuccessors(project, opsToCheckFor)) { + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + + fieldName + ") inside a function in the " + + "filter condition.", errCode, PigException.INPUT); + } + opsToCheckFor.set(0, CastExpression.class); + if(checkSuccessors(project, opsToCheckFor)) { + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + + fieldName + ") inside a cast in the " + + "filter condition.", errCode, PigException.INPUT); + } + + opsToCheckFor.set(0, IsNullExpression.class); + if(checkSuccessors(project, opsToCheckFor)) { + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + + fieldName + ") inside a null check operator in the " + + "filter condition.", errCode, PigException.INPUT); + } + opsToCheckFor.set(0, BinCondExpression.class); + if(checkSuccessors(project, opsToCheckFor)) { + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + + fieldName + ") inside a bincond operator in the " + + "filter condition.", errCode, PigException.INPUT); + } + opsToCheckFor.set(0, AndExpression.class); + opsToCheckFor.add(OrExpression.class); + if(checkSuccessors(project, opsToCheckFor)) { + errCode = 1112; + throw new FrontendException("Unsupported query: " + + "You have an partition column (" + fieldName + + " ) in a construction like: " + + "(pcond and ...) or (pcond and ...) " + + "where pcond is a condition on a partition column.", + errCode, PigException.INPUT); + } + } else { + sawNonKeyCol = true; + } + } + + private void visit(BinaryExpression binOp) throws FrontendException { + boolean lhsSawKey = false; + boolean rhsSawKey = false; + boolean lhsSawNonKeyCol = false; + boolean rhsSawNonKeyCol = false; + + sawKey = false; + sawNonKeyCol = false; + visit( binOp.getLhs() ); + replaceChild(binOp.getLhs()); + lhsSawKey = sawKey; + lhsSawNonKeyCol = sawNonKeyCol; + + sawKey = false; + sawNonKeyCol = false; + visit( binOp.getRhs() ); + replaceChild(binOp.getRhs()); + rhsSawKey = sawKey; + rhsSawNonKeyCol = sawNonKeyCol; + + // only in the case of an AND, we potentially split the AND to + // remove conditions on partition columns out of the AND. For this + // we set replaceSide accordingly so that when we reach a predecessor + // we can trim the appropriate side. If both sides of the AND have + // conditions on partition columns, we will remove the AND completely - + // in this case, we will not set replaceSide, but sawKey will be + // true so that as we go to higher predecessor ANDs we can trim later. + if(binOp instanceof AndExpression) { + if(lhsSawKey && rhsSawNonKeyCol){ + replaceSide = Side.LEFT; + }else if(rhsSawKey && lhsSawNonKeyCol){ + replaceSide = Side.RIGHT; + } + } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){ + int errCode = 1111; + String errMsg = "Use of partition column/condition with" + + " non partition column/condition in filter expression is not " + + "supported." ; + throw new FrontendException(errMsg, errCode, PigException.INPUT); + } + + sawKey = lhsSawKey || rhsSawKey; + sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol; + } + + + + /** + * @return the condition on partition columns extracted from filter + */ + public Expression getPColCondition(){ + if(pColConditions.size() == 0) + return null; + Expression cond = pColConditions.get(0); + for(int i=1; i<pColConditions.size(); i++){ + //if there is more than one condition expression + // connect them using "AND"s + cond = new Expression.BinaryExpression(cond, pColConditions.get(i), + OpType.OP_AND); + } + return cond; + } + + /** + * @return the filterRemovable + */ + public boolean isFilterRemovable() { + return filterRemovable; + } + + //////// helper methods ///////////////////////// + /** + * check for the presence of a certain operator type in the Successors + * @param opToStartFrom + * @param opsToCheckFor operators to be checked for at each level of + * Successors - the ordering in the list is the order in which the ops + * will be checked. + * @return true if opsToCheckFor are found + * @throws IOException + */ + private boolean checkSuccessors(Operator opToStartFrom, + List<Class<?>> opsToCheckFor) throws FrontendException { + boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor); + if(!done && !opsToCheckFor.isEmpty()) { + // continue checking if there is more to check + while(!done) { + opToStartFrom = plan.getPredecessors(opToStartFrom).get(0); + done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor); + } + } + return opsToCheckFor.isEmpty(); + } + + private boolean checkSuccessorsHelper(Operator opToStartFrom, + List<Class<?>> opsToCheckFor) throws FrontendException { + List<Operator> successors = plan.getPredecessors( + opToStartFrom); + if(successors == null || successors.size() == 0) { + return true; // further checking cannot be done + } + if(successors.size() == 1) { + Operator suc = successors.get(0); + if(suc.getClass().getCanonicalName().equals( + opsToCheckFor.get(0).getCanonicalName())) { + // trim the list of operators to check + opsToCheckFor.remove(0); + if(opsToCheckFor.isEmpty()) { + return true; //no further checks required + } + } + } else { + throwException(); + } + return false; // more checking can be done + } + + private void replaceChild(LogicalExpression childExpr) throws FrontendException { + + if(replaceSide == Side.NONE) { + // the child is trimmed when the appropriate + // flag is set to indicate that it needs to be trimmed. + return; + } + + // eg if replaceSide == Side.LEFT + // binexpop + // / \ \ + // child (this is the childExpr argument send in) + // / \ + // Lt Rt + // + // gets converted to + // binexpop + // / + // Rt + + if( !( childExpr instanceof BinaryExpression ) ) { + throwException(); + } + // child's lhs operand + LogicalExpression leftChild = + ((BinaryExpression)childExpr).getLhs(); + // child's rhs operand + LogicalExpression rightChild = + ((BinaryExpression)childExpr).getRhs(); + + plan.disconnect( childExpr, leftChild ); + plan.disconnect( childExpr, rightChild ); + + if(replaceSide == Side.LEFT) { + // remove left child and replace childExpr with its right child + remove( leftChild ); + replace(childExpr, rightChild); + } else if(replaceSide == Side.RIGHT){ + // remove right child and replace childExpr with its left child + remove(rightChild); + replace(childExpr, leftChild); + }else { + throwException(); + } + //reset + replaceSide = Side.NONE; + sawKey = false; + + } + + private void replace(Operator oldOp, Operator newOp) throws FrontendException { + List<Operator> grandParents = plan.getPredecessors( oldOp ); + if( grandParents == null || grandParents.size() == 0 ) { + plan.remove( oldOp ); + return; + } + Operator grandParent = plan.getPredecessors( oldOp ).get( 0 ); + Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp ); + plan.add( newOp ); + plan.connect( grandParent, pair.first, newOp, pair.second ); + plan.remove( oldOp ); + } + + /** + * @param op + * @throws IOException + * @throws IOException + * @throws IOException + */ + private void remove(LogicalExpression op) throws FrontendException { + pColConditions.add( getExpression( op ) ); + removeTree( op ); + } + + /** + * Assume that the given operator is already disconnected from its predecessors. + * @param op + * @throws FrontendException + */ + private void removeTree(Operator op) throws FrontendException { + List<Operator> succs = plan.getSuccessors( op ); + if( succs == null ) { + plan.remove( op ); + return; + } + + Operator[] children = new Operator[succs.size()]; + for( int i = 0; i < succs.size(); i++ ) { + children[i] = succs.get(i); + } + + for( Operator succ : children ) { + plan.disconnect( op, succ ); + removeTree( succ ); + } + + plan.remove( op ); + } + + public static Expression getExpression(LogicalExpression op) throws FrontendException + { + if(op instanceof ConstantExpression) { + ConstantExpression constExpr =(ConstantExpression)op ; + return new Expression.Const( constExpr.getValue() ); + } else if (op instanceof ProjectExpression) { + ProjectExpression projExpr = (ProjectExpression)op; + String fieldName = projExpr.getFieldSchema().alias; + return new Expression.Column(fieldName); + } else { + if( !( op instanceof BinaryExpression ) ) { + throwException(); + } + BinaryExpression binOp = (BinaryExpression)op; + if(binOp instanceof AddExpression) { + return getExpression( binOp, OpType.OP_PLUS ); + } else if(binOp instanceof SubtractExpression) { + return getExpression(binOp, OpType.OP_MINUS); + } else if(binOp instanceof MultiplyExpression) { + return getExpression(binOp, OpType.OP_TIMES); + } else if(binOp instanceof DivideExpression) { + return getExpression(binOp, OpType.OP_DIV); + } else if(binOp instanceof ModExpression) { + return getExpression(binOp, OpType.OP_MOD); + } else if(binOp instanceof AndExpression) { + return getExpression(binOp, OpType.OP_AND); + } else if(binOp instanceof OrExpression) { + return getExpression(binOp, OpType.OP_OR); + } else if(binOp instanceof EqualExpression) { + return getExpression(binOp, OpType.OP_EQ); + } else if(binOp instanceof NotEqualExpression) { + return getExpression(binOp, OpType.OP_NE); + } else if(binOp instanceof GreaterThanExpression) { + return getExpression(binOp, OpType.OP_GT); + } else if(binOp instanceof GreaterThanEqualExpression) { + return getExpression(binOp, OpType.OP_GE); + } else if(binOp instanceof LessThanExpression) { + return getExpression(binOp, OpType.OP_LT); + } else if(binOp instanceof LessThanEqualExpression) { + return getExpression(binOp, OpType.OP_LE); + } else { + throwException(); + } + } + return null; + } + + private static Expression getExpression(BinaryExpression binOp, OpType + opType) throws FrontendException { + return new Expression.BinaryExpression(getExpression(binOp.getLhs()) + ,getExpression(binOp.getRhs()), opType); + } + private static void throwException() throws FrontendException { + int errCode = 2209; + throw new FrontendException( + "Internal error while processing any partition filter " + + "conditions in the filter after the load" , + errCode, + PigException.BUG + ); + } + + // this might get called from some visit() - in that case, delegate to + // the other visit()s which we have defined here + private void visit(LogicalExpression op) throws FrontendException { + if(op instanceof ProjectExpression) { + visit((ProjectExpression)op); + } else if (op instanceof BinaryExpression) { + visit((BinaryExpression)op); + } else if (op instanceof CastExpression) { + visit((CastExpression)op); + } else if (op instanceof BinCondExpression) { + visit((BinCondExpression)op); + } else if (op instanceof UserFuncExpression) { + visit((UserFuncExpression)op); + } else if (op instanceof IsNullExpression) { + visit((IsNullExpression)op); + } else if( op instanceof NotExpression ) { + visit( (NotExpression)op ); + } else if( op instanceof RegexExpression ) { + visit( (RegexExpression)op ); + } + } + + // some specific operators which are of interest to catch some + // unsupported scenarios + private void visit(CastExpression cast) throws FrontendException { + visit(cast.getExpression()); + } + + private void visit(NotExpression not) throws FrontendException { + visit(not.getExpression()); + } + + private void visit(RegexExpression regexp) throws FrontendException { + visit((BinaryExpression)regexp); + } + + private void visit(BinCondExpression binCond) throws FrontendException { + visit(binCond.getCondition()); + visit(binCond.getLhs()); + visit(binCond.getRhs()); + } + + private void visit(UserFuncExpression udf) throws FrontendException { + for (LogicalExpression op : udf.getArguments()) { + visit(op); + } + } + + private void visit(IsNullExpression isNull) throws FrontendException { + visit(isNull.getExpression()); + } + +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java?rev=989251&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java Wed Aug 25 17:42:06 2010 @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.rules; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.Expression; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; +import org.apache.pig.Expression.BinaryExpression; +import org.apache.pig.Expression.Column; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.OperatorSubPlan; +import org.apache.pig.newplan.PColFilterExtractor; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.newplan.optimizer.Transformer; +import org.apache.pig.impl.logicalLayer.FrontendException; + +public class PartitionFilterPushDown extends Rule { + private String[] partitionKeys; + + /** + * a reference to the LoadMetada implementation + */ + private LoadMetadata loadMetadata; + + /** + * a reference to the LoadFunc implementation + */ + private LoadFunc loadFunc; + + private LOLoad loLoad; + private LOFilter loFilter; + + /** + * a map between column names as reported in + * {...@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)} + * and as present in {...@link LOLoad#getSchema()}. The two will be different + * when the user has provided a schema in the load statement + */ + private Map<String, String> colNameMap = new HashMap<String, String>(); + + /** + * a map between column nameas as present in {...@link LOLoad#getSchema()} and + * as reported in + * {...@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}. + * The two will be different when the user has provided a schema in the + * load statement. + */ + private Map<String, String> reverseColNameMap = new HashMap<String, String>(); + + public PartitionFilterPushDown(String name) { + super( name, false ); + } + + @Override + protected OperatorPlan buildPattern() { + // match each foreach. + LogicalPlan plan = new LogicalPlan(); + LogicalRelationalOperator load = new LOLoad (null, null, plan, null ); + plan.add( load ); +// LogicalRelationalOperator filter = new LOFilter( plan ); +// plan.add( filter ); +// plan.connect( load, filter ); + return plan; + } + + @Override + public Transformer getNewTransformer() { + return new PartitionFilterPushDownTransformer(); + } + + public class PartitionFilterPushDownTransformer extends Transformer { + private OperatorSubPlan subPlan; + + @Override + public boolean check(OperatorPlan matched) throws FrontendException { + loLoad = (LOLoad)matched.getSources().get(0); + // Match filter. + List<Operator> succeds = currentPlan.getSuccessors( loLoad ); + if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) ) + return false; + loFilter = (LOFilter)succeds.get(0); + + // we have to check more only if LoadFunc implements LoadMetada + loadFunc = loLoad.getLoadFunc(); + if(!( loadFunc instanceof LoadMetadata ) ) { + return false; + } + + loadMetadata = (LoadMetadata)loadFunc; + try { + partitionKeys = loadMetadata.getPartitionKeys( + loLoad.getFileSpec().getFileName(), new Job( loLoad.getConfiguration() ) ); + } catch (IOException e) { + throw new FrontendException( e ); + } + if( partitionKeys == null || partitionKeys.length == 0 ) { + return false; + } + +// LogicalExpressionPlan filterExpr = filter.getFilterPlan(); + + // we found a load-filter pattern where the load returns partition keys + return true; + } + + @Override + public OperatorPlan reportChanges() { + return subPlan; + } + + @Override + public void transform(OperatorPlan matched) throws FrontendException { + subPlan = new OperatorSubPlan( currentPlan ); + + setupColNameMaps(); + PColFilterExtractor pColFilterFinder = new PColFilterExtractor( + loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) ); + pColFilterFinder.visit(); + Expression partitionFilter = pColFilterFinder.getPColCondition(); + if(partitionFilter != null) { + // the column names in the filter may be the ones provided by + // the user in the schema in the load statement - we may need + // to replace them with partition column names as given by + // LoadFunc.getSchema() + updateMappedColNames(partitionFilter); + try { + loadMetadata.setPartitionFilter(partitionFilter); + } catch (IOException e) { + throw new FrontendException( e ); + } + if(pColFilterFinder.isFilterRemovable()) { + // remove this filter from the plan + Operator from = currentPlan.getPredecessors( loFilter ).get( 0 ); + currentPlan.disconnect( from, loFilter ); + List<Operator> succs = currentPlan.getSuccessors( loFilter ); + if( succs != null ) { + Operator to = succs.get( 0 ); + currentPlan.disconnect( loFilter, to ); + currentPlan.connect( from, to ); + } + currentPlan.remove( loFilter ); + } + } + } + + private void updateMappedColNames(Expression expr) { + if(expr instanceof BinaryExpression) { + updateMappedColNames(((BinaryExpression) expr).getLhs()); + updateMappedColNames(((BinaryExpression) expr).getRhs()); + } else if (expr instanceof Column) { + Column col = (Column) expr; + col.setName(reverseColNameMap.get(col.getName())); + } + } + + /** + * The partition keys in the argument are as reported by + * {...@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}. + * The user may have renamed these by providing a schema with different names + * in the load statement - this method will replace the former names with + * the latter names. + * @param partitionKeys + * @return + */ + private List<String> getMappedKeys(String[] partitionKeys) { + List<String> mappedKeys = new ArrayList<String>(partitionKeys.length); + for (int i = 0; i < partitionKeys.length; i++) { + mappedKeys.add(colNameMap.get(partitionKeys[i])); + } + return mappedKeys; + } + + private void setupColNameMaps() throws FrontendException { + LogicalSchema loLoadSchema = loLoad.getSchema(); + LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema(); + for(int i = 0; i < loadFuncSchema.size(); i++) { + colNameMap.put(loadFuncSchema.getField(i).alias, + (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : + loadFuncSchema.getField(i).alias)); + + reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : + loadFuncSchema.getField(i).alias), + loadFuncSchema.getField(i).alias); + } + } + + } + +} Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=989251&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Wed Aug 25 17:42:06 2010 @@ -0,0 +1,628 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.ExecType; +import org.apache.pig.Expression; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor; +import org.apache.pig.newplan.logical.expression.LogicalExpression; +import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.rules.PartitionFilterPushDown; +import org.apache.pig.newplan.logical.rules.TypeCastInserter; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PColFilterExtractor; +import org.apache.pig.newplan.optimizer.PlanOptimizer; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.PlanSetter; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.LogUtils; +import org.apache.pig.test.utils.LogicalPlanTester; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * unit tests to test extracting partition filter conditions out of the filter + * condition in the filter following a load which talks to metadata system (.i.e. + * implements {...@link LoadMetadata}) + */ +public class TestPartitionFilterPushDown { + static PigContext pc = new PigContext(ExecType.LOCAL, new Properties()); + static LogicalPlanTester lpTester; + + @BeforeClass + public static void setup() throws Exception { + lpTester = new LogicalPlanTester(pc); + lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);"); + } + + @AfterClass + public static void tearDown() { + } + + /** + * test case where there is a single expression on partition columns in + * the filter expression along with an expression on non partition column + * @throws IOException + */ + @Test + public void testSimpleMixed() throws IOException { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';"); + test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')"); + } + + /** + * test case where filter does not contain any condition on partition cols + * @throws Exception + */ + @Test + public void testNoPartFilter() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';"); + test(lp, Arrays.asList("srcid"), null, + "((age == 20) and (name == 'foo'))"); + } + + /** + * test case where filter only contains condition on partition cols + * @throws Exception + */ + @Test + public void testOnlyPartFilter1() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';"); + test(lp, Arrays.asList("srcid", "mrkt"), + "((srcid > 20) and (mrkt == 'us'))", null); + + } + + /** + * test case where filter only contains condition on partition cols + * @throws Exception + */ + @Test + public void testOnlyPartFilter2() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by mrkt == 'us';"); + test(lp, Arrays.asList("srcid", "mrkt"), + "(mrkt == 'us')", null); + + } + + /** + * test case where filter only contains condition on partition cols + * @throws Exception + */ + @Test + public void testOnlyPartFilter3() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';"); + test(lp, Arrays.asList("srcid", "mrkt"), + "((srcid == 20) or (mrkt == 'us'))", null); + + } + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns + */ + @Test + public void testMixed1() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "(age < 20 and mrkt == 'us') and (srcid == 10 and " + + "name == 'foo');"); + test(lp, Arrays.asList("srcid", "mrkt"), + "((mrkt == 'us') and (srcid == 10))", + "((age < 20) and (name == 'foo'))"); + } + + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns + */ + @Test + public void testMixed2() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "(age >= 20 and mrkt == 'us') and (srcid == 10 and " + + "dstid == 15);"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", + "(age >= 20)"); + } + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns + */ + @Test + public void testMixed3() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "age >= 20 and mrkt == 'us' and srcid == 10;"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "((mrkt == 'us') and (srcid == 10))", "(age >= 20)"); + } + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns - this testcase also has a condition + * based on comparison of two partition columns + */ + @Test + public void testMixed4() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "age >= 20 and mrkt == 'us' and name == 'foo' and " + + "srcid == dstid;"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "((mrkt == 'us') and (srcid == dstid))", + "((age >= 20) and (name == 'foo'))"); + } + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns - + * This testcase has two partition col conditions with OR + non parition + * col conditions + */ + @Test + public void testMixed5() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "(srcid == 10 or mrkt == 'us') and name == 'foo' and " + + "dstid == 30;"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", + "(name == 'foo')"); + } + + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns - + * This testcase has two partition col conditions with OR + non parition + * col conditions + */ + @Test + public void testMixed6() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", + "(name == 'foo')"); + } + /** + * test case where filter has both conditions on partition cols and non + * partition cols and the filter condition will be split to extract the + * conditions on partition columns. This testcase also tests arithmetic + * in partition column conditions + */ + @Test + public void testMixedArith() throws Exception { + org.apache.pig.impl.logicalLayer.LogicalPlan lp = + lpTester.buildPlan("b = filter a by " + + "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;"); + test(lp, Arrays.asList("srcid", "dstid", "mrkt"), + "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", + "(age != 15)"); + } + + @Test + public void testNegPColConditionWithNonPCol() throws Exception { + // use of partition column condition and non partition column in + // same condition should fail + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "srcid > age;"); + negativeTest(lp, Arrays.asList("srcid"), 1111); + lp = lpTester.buildPlan("b = filter a by " + + "srcid + age == 20;"); + negativeTest(lp, Arrays.asList("srcid"), 1111); + + // OR of partition column condition and non partiton col condition + // should fail + lp = lpTester.buildPlan("b = filter a by " + + "srcid > 10 or name == 'foo';"); + negativeTest(lp, Arrays.asList("srcid"), 1111); + } + + @Test + public void testNegPColInWrongPlaces() throws Exception { + + int expectedErrCode = 1112; + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "(srcid > 10 and name == 'foo') or dstid == 10;"); + negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode); + + expectedErrCode = 1110; + lp = lpTester.buildPlan("b = filter a by " + + "CONCAT(mrkt, '_10') == 'US_10' and age == 20;"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode); + + lp = lpTester.buildPlan("b = filter a by " + + "mrkt matches '.*us.*' and age < 15;"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode); + + lp = lpTester.buildPlan("b = filter a by " + + "(int)mrkt == 10 and name matches '.*foo.*';"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode); + + lp = lpTester.buildPlan("b = filter a by " + + "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode); + + lp = lpTester.buildPlan("b = filter a by " + + "(mrkt is null) and name matches '.*foo.*';"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode); + + lp = lpTester.buildPlan("b = filter a by " + + "(mrkt is not null) and name matches '.*foo.*';"); + negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode); + } + + + /** + * Test that pig sends correct partition column names in setPartitionFilter + * when the user has a schema in the load statement which renames partition + * columns + * @throws Exception + */ + @Test + public void testColNameMapping1() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " + + "'srcid,mrkt') as (f1, f2, f3, f4, f5);"); + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);"); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp ); + + Assert.assertEquals("checking partition filter:", + "((mrkt == 'us') and (srcid == 10))", + TestLoader.partFilter.toString()); + LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0); + String actual = PColFilterExtractor.getExpression( + (LogicalExpression)filter.getFilterPlan().getSources().get(0)). + toString().toLowerCase(); + Assert.assertEquals("checking trimmed filter expression:", + "((f5 >= 20) and (f3 == 15))", actual); + } + + private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException { + LogicalPlan newLogicalPlan = migratePlan( plan ); + PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 ); + optimizer.optimize(); + return newLogicalPlan; + } + + + /** + * Test that pig sends correct partition column names in setPartitionFilter + * when the user has a schema in the load statement which renames partition + * columns - in this test case there is no condition on partition columns + * - so setPartitionFilter() should not be called and the filter condition + * should remain as is. + * @throws Exception + */ + @Test + public void testColNameMapping2() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " + + "'srcid') as (f1, f2, f3, f4, f5);"); + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "f5 >= 20 and f2 == 'us' and f3 == 15;"); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp ); + + Assert.assertEquals("checking partition filter:", + null, + TestLoader.partFilter); + LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0); + String actual = PColFilterExtractor.getExpression( + (LogicalExpression) filter.getFilterPlan(). + getSources().get(0)). + toString().toLowerCase(); + Assert.assertEquals("checking trimmed filter expression:", + "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual); + } + + /** + * Test that pig sends correct partition column names in setPartitionFilter + * when the user has a schema in the load statement which renames partition + * columns - in this test case the filter only has conditions on partition + * columns + * @throws Exception + */ + @Test + public void testColNameMapping3() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " + + "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);"); + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);"); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp ); + + Assert.assertEquals("checking partition filter:", + "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " + + "(dstid == 15)))", + TestLoader.partFilter.toString()); + Iterator<Operator> it = newLogicalPlan.getOperators(); + Assert.assertTrue("Checking that filter has been removed since it contained" + + " only conditions on partition cols:", + (it.next() instanceof LOLoad)); + Assert.assertFalse("Checking that filter has been removed since it contained" + + " only conditions on partition cols:", + it.hasNext()); + + } + + /** + * Test that pig sends correct partition column names in setPartitionFilter + * when the user has a schema in the load statement which renames partition + * columns - in this test case the schema in load statement is a prefix + * (with columns renamed) of the schema returned by + * {...@link LoadMetadata#getSchema(String, Configuration)} + * @throws Exception + */ + @Test + public void testColNameMapping4() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " + + "'srcid,mrkt') as (f1, f2, f3);"); + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " + + "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);"); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp ); + + Assert.assertEquals("checking partition filter:", + "((mrkt == 'us') and (srcid == 10))", + TestLoader.partFilter.toString()); + LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0); + String actual = PColFilterExtractor.getExpression( + (LogicalExpression) filter.getFilterPlan().getSources().get(0)). + toString().toLowerCase(); + Assert.assertEquals("checking trimmed filter expression:", + "((age >= 20) and (f3 == 15))", actual); + } + + /** + * Test PIG-1267 + * @throws Exception + */ + @Test + public void testColNameMapping5() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " + + "'srcid');"); + lpTester.buildPlan("b = load 'bar' using " + + TestLoader.class.getName() + + "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," + + "'srcid');"); + lpTester.buildPlan("a1 = filter a by srcid == 10;"); + lpTester.buildPlan("b1 = filter b by srcid == 20;"); + lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;"); + org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester + .buildPlan("d = foreach c generate $4 as bcookie:chararray, " + + "$5 as dstid:int, $0 as mrkt:chararray;"); + + new PlanSetter(lp).visit(); + + LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp ); + + String partFilter = TestLoader.partFilter.toString(); + Assert.assertTrue( "(srcid == 20)".equals( partFilter ) || "(srcid == 10)".equals( partFilter ) ); + + int counter = 0; + Iterator<Operator> iter = newLogicalPlan.getOperators(); + while (iter.hasNext()) { + Assert.assertTrue(!(iter.next() instanceof LOFilter)); + counter++; + } + Assert.assertEquals(counter, 4); + } + + //// helper methods /////// + + private PColFilterExtractor test(org.apache.pig.impl.logicalLayer.LogicalPlan lp, List<String> partitionCols, + String expPartFilterString, String expFilterString) + throws IOException { + LogicalPlan newLogicalPlan = migratePlan( lp ); + LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0); + PColFilterExtractor pColExtractor = new PColFilterExtractor( + filter.getFilterPlan(), partitionCols); + pColExtractor.visit(); + + if(expPartFilterString == null) { + Assert.assertEquals("Checking partition column filter:", null, + pColExtractor.getPColCondition()); + } else { + Assert.assertEquals("Checking partition column filter:", + expPartFilterString.toLowerCase(), + pColExtractor.getPColCondition().toString().toLowerCase()); + } + + if(expFilterString == null) { + Assert.assertTrue("Check that filter can be removed:", + pColExtractor.isFilterRemovable()); + } else { + String actual = PColFilterExtractor.getExpression( + (LogicalExpression)filter.getFilterPlan().getSources().get(0)). + toString().toLowerCase(); + Assert.assertEquals("checking trimmed filter expression:", expFilterString, + actual); + } + return pColExtractor; + } + + private void negativeTest(org.apache.pig.impl.logicalLayer.LogicalPlan lp, List<String> partitionCols, + int expectedErrorCode) throws VisitorException { + LogicalPlan newLogicalPlan = migratePlan( lp ); + LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0); + PColFilterExtractor pColExtractor = new PColFilterExtractor( + filter.getFilterPlan(), partitionCols); + try { + pColExtractor.visit(); + } catch(Exception e) { + Assert.assertEquals("Checking if exception has right error code", + expectedErrorCode, LogUtils.getPigException(e).getErrorCode()); + return; + } + Assert.fail("Exception expected!"); + } + + /** + * this loader is only used to test that parition column filters are given + * in the manner expected in terms of column names - hence it does not + * implement many of the methods and only implements required ones. + */ + public static class TestLoader extends LoadFunc implements LoadMetadata { + + Schema schema; + String[] partCols; + static Expression partFilter = null; + + public TestLoader(String schemaString, String commaSepPartitionCols) + throws ParseException { + schema = Util.getSchemaFromString(schemaString); + partCols = commaSepPartitionCols.split(","); + } + + @Override + public InputFormat getInputFormat() throws IOException { + return null; + } + + @Override + public Tuple getNext() throws IOException { + return null; + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + } + + @Override + public void setLocation(String location, Job job) throws IOException { + } + + @Override + public String[] getPartitionKeys(String location, Job job) + throws IOException { + return partCols; + } + + @Override + public ResourceSchema getSchema(String location, Job job) + throws IOException { + return new ResourceSchema(schema); + } + + @Override + public ResourceStatistics getStatistics(String location, + Job job) throws IOException { + return null; + } + + @Override + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + partFilter = partitionFilter; + } + + } + + public class MyPlanOptimizer extends LogicalPlanOptimizer { + protected MyPlanOptimizer(OperatorPlan p, int iterations) { + super( p, iterations, new HashSet<String>() ); + } + + protected List<Set<Rule>> buildRuleSets() { + List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); + + Set<Rule> s = new HashSet<Rule>(); + // add split filter rule + Rule r = new PartitionFilterPushDown("PartitionFilterPushDown"); + s = new HashSet<Rule>(); + s.add(r); + ls.add(s); + + r = new TypeCastInserter( "TypeCastInserter", LOLoad.class.getName() ); + s = new HashSet<Rule>(); + s.add(r); + ls.add(s); + return ls; + } + } + + private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{ + LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp); + visitor.visit(); + LogicalPlan newPlan = visitor.getNewLogicalPlan(); + return newPlan; + } + +}