Author: daijy
Date: Wed Aug 25 17:42:06 2010
New Revision: 989251

URL: http://svn.apache.org/viewvc?rev=989251&view=rev
Log:
PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=989251&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Wed 
Aug 25 17:42:06 2010
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.Expression;
+import org.apache.pig.PigException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinCondExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.DepthFirstWalker;
+
+/**
+ * This Visitor works on the filter condition of a LOFilter which immediately 
+ * follows a LOLoad that interacts with a metadata system (currently OWL) to 
+ * read table data. The visitor looks for conditions on partition columns in 
the
+ * filter condition and extracts those conditions out of the filter condition.
+ * The condition on partition cols will be used to prune partitions of the 
table.
+ *
+ */
+public class PColFilterExtractor extends PlanVisitor {
+       /**
+        * partition columns associated with the table
+        * present in the load on which the filter whose
+        * inner plan is being visited is applied
+        */
+       private List<String> partitionCols;
+
+       /**
+        * will contain the partition column filter conditions
+        * accumulated during the visit - the final condition will an expression
+        * built from these sub expressions connected with AND
+        */
+       private ArrayList<Expression> pColConditions = new 
ArrayList<Expression>();
+
+       /**
+        * flag used during visit to indicate if a partition key
+        * was seen
+        */
+       private boolean sawKey;
+
+       private boolean sawNonKeyCol;
+
+       private enum Side { LEFT, RIGHT, NONE };
+       private Side replaceSide = Side.NONE;
+
+       private boolean filterRemovable = false;
+
+       @Override
+       public void visit() throws FrontendException {
+               // we will visit the leaf and it will recursively walk the plan
+               LogicalExpression leaf = 
(LogicalExpression)plan.getSources().get( 0 );
+               // if the leaf is a unary operator it should be a FilterFunc in 
+               // which case we don't try to extract partition filter 
conditions
+               if(leaf instanceof BinaryExpression) {
+                       BinaryExpression binExpr = (BinaryExpression)leaf;
+                       visit( binExpr );
+                       replaceChild( binExpr );
+                       // if the entire expression is to be removed, then the 
above
+                       // replaceChild will not set sawKey to false (sawKey is 
set to
+                       // false only in replaceChild()
+                       if(sawKey == true) {
+                               //there are only conditions on partition 
columns in the filter
+                               //extract it
+                               pColConditions.add( getExpression( leaf ) );
+                               filterRemovable = true;
+                       }
+               }
+       }
+
+       /**
+        * 
+        * @param plan logical plan corresponding the filter's comparison 
condition
+        * @param partitionCols list of partition columns of the table which is
+        * being loaded in the LOAD statement which is input to the filter
+        */
+       public PColFilterExtractor(OperatorPlan plan,
+                       List<String> partitionCols) {
+               // though we configure a DepthFirstWalker to be the walker, we 
will not
+               // use it - we will visit the leaf and it will recursively walk 
the
+               // plan
+               super( plan, new DepthFirstWalker( plan ) );
+               this.partitionCols = new ArrayList<String>(partitionCols);
+       }
+
+       protected void visit(ProjectExpression project) throws 
FrontendException {
+               String fieldName = project.getFieldSchema().alias;
+               if(partitionCols.contains(fieldName)) {
+                       sawKey = true;
+                       // The condition on partition column will be used to 
prune the
+                       // scan and removed from the filter condition. Hence 
the condition 
+                       // on the partition column will not be re applied when 
data is read,
+                       // so the following cases should throw error until that 
changes.
+                       List<Class<?>> opsToCheckFor = new 
ArrayList<Class<?>>();
+                       opsToCheckFor.add(RegexExpression.class);
+                       int errCode = 1110;
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" 
+                                               + fieldName + ") inside a 
regexp operator in the " +
+                                               "filter condition.", errCode, 
PigException.INPUT);
+                       } 
+                       opsToCheckFor.set(0, UserFuncExpression.class);
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" 
+                                               + fieldName + ") inside a 
function in the " +
+                                               "filter condition.", errCode, 
PigException.INPUT);
+                       }
+                       opsToCheckFor.set(0, CastExpression.class);
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" 
+                                               + fieldName + ") inside a cast 
in the " +
+                                               "filter condition.", errCode, 
PigException.INPUT);
+                       }
+
+                       opsToCheckFor.set(0, IsNullExpression.class);
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" 
+                                               + fieldName + ") inside a null 
check operator in the " +
+                                               "filter condition.", errCode, 
PigException.INPUT);
+                       }
+                       opsToCheckFor.set(0, BinCondExpression.class);
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" 
+                                               + fieldName + ") inside a 
bincond operator in the " +
+                                               "filter condition.", errCode, 
PigException.INPUT);
+                       }
+                       opsToCheckFor.set(0, AndExpression.class);
+                       opsToCheckFor.add(OrExpression.class);
+                       if(checkSuccessors(project, opsToCheckFor)) {
+                               errCode = 1112;
+                               throw new FrontendException("Unsupported query: 
" +
+                                               "You have an partition column 
(" + fieldName +
+                                               " ) in a construction like: " +
+                                               "(pcond  and ...) or (pcond and 
...) " +
+                                               "where pcond is a condition on 
a partition column.",
+                                               errCode, PigException.INPUT);
+                       }
+               } else {
+                       sawNonKeyCol = true;
+               }
+       }
+
+       private void visit(BinaryExpression binOp) throws FrontendException {
+               boolean lhsSawKey = false;        
+               boolean rhsSawKey = false;        
+               boolean lhsSawNonKeyCol = false;        
+               boolean rhsSawNonKeyCol = false;        
+
+               sawKey = false;
+               sawNonKeyCol = false;
+               visit( binOp.getLhs() );
+               replaceChild(binOp.getLhs());
+               lhsSawKey = sawKey;
+               lhsSawNonKeyCol = sawNonKeyCol;
+
+               sawKey = false;
+               sawNonKeyCol = false;
+               visit( binOp.getRhs() );
+               replaceChild(binOp.getRhs());
+               rhsSawKey = sawKey;
+               rhsSawNonKeyCol = sawNonKeyCol;
+
+               // only in the case of an AND, we potentially split the AND to 
+               // remove conditions on partition columns out of the AND. For 
this 
+               // we set replaceSide accordingly so that when we reach a 
predecessor
+               // we can trim the appropriate side. If both sides of the AND 
have 
+               // conditions on partition columns, we will remove the AND 
completely - 
+               // in this case, we will not set replaceSide, but sawKey will 
be 
+               // true so that as we go to higher predecessor ANDs we can trim 
later.
+               if(binOp instanceof AndExpression) {
+                       if(lhsSawKey && rhsSawNonKeyCol){
+                               replaceSide = Side.LEFT;
+                       }else if(rhsSawKey && lhsSawNonKeyCol){
+                               replaceSide = Side.RIGHT;
+                       }
+               } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && 
lhsSawNonKeyCol){
+                       int errCode = 1111;
+                       String errMsg = "Use of partition column/condition 
with" +
+                       " non partition column/condition in filter expression 
is not " +
+                       "supported." ;
+                       throw new FrontendException(errMsg, errCode, 
PigException.INPUT);
+               }
+
+               sawKey = lhsSawKey || rhsSawKey;
+               sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
+       }
+
+
+
+       /**
+        * @return the condition on partition columns extracted from filter
+        */
+       public  Expression getPColCondition(){
+               if(pColConditions.size() == 0)
+                       return null;
+               Expression cond =  pColConditions.get(0);
+               for(int i=1; i<pColConditions.size(); i++){
+                       //if there is more than one condition expression
+                       // connect them using "AND"s
+                       cond = new Expression.BinaryExpression(cond, 
pColConditions.get(i),
+                    OpType.OP_AND);
+               }
+               return cond;
+       }
+
+       /**
+        * @return the filterRemovable
+        */
+       public boolean isFilterRemovable() {
+               return filterRemovable;
+       }
+
+       //////// helper methods /////////////////////////
+       /**
+        * check for the presence of a certain operator type in the Successors
+        * @param opToStartFrom
+        * @param opsToCheckFor operators to be checked for at each level of 
+        * Successors - the ordering in the list is the order in which the ops 
+        * will be checked.
+        * @return true if opsToCheckFor are found
+        * @throws IOException 
+        */
+       private boolean checkSuccessors(Operator opToStartFrom, 
+                       List<Class<?>> opsToCheckFor) throws FrontendException {
+               boolean done = checkSuccessorsHelper(opToStartFrom, 
opsToCheckFor);
+               if(!done && !opsToCheckFor.isEmpty()) {
+                       // continue checking if there is more to check
+                       while(!done) {
+                               opToStartFrom = 
plan.getPredecessors(opToStartFrom).get(0);
+                               done = checkSuccessorsHelper(opToStartFrom, 
opsToCheckFor);
+                       }
+               }
+               return opsToCheckFor.isEmpty();
+       }
+
+       private boolean checkSuccessorsHelper(Operator opToStartFrom, 
+                       List<Class<?>> opsToCheckFor) throws FrontendException {
+               List<Operator> successors = plan.getPredecessors(
+                               opToStartFrom);
+               if(successors == null || successors.size() == 0) {
+                       return true; // further checking cannot be done
+               }
+               if(successors.size() == 1) {
+                       Operator suc  = successors.get(0);
+                       if(suc.getClass().getCanonicalName().equals(
+                                       
opsToCheckFor.get(0).getCanonicalName())) {
+                               // trim the list of operators to check
+                               opsToCheckFor.remove(0);
+                               if(opsToCheckFor.isEmpty()) {
+                                       return true; //no further checks 
required
+                               }
+                       }
+               } else {
+                       throwException();
+               }
+               return false; // more checking can be done
+       }
+
+       private void replaceChild(LogicalExpression childExpr) throws 
FrontendException {
+
+               if(replaceSide == Side.NONE) {
+                       // the child is trimmed when the appropriate
+                       // flag is set to indicate that it needs to be trimmed.
+                       return;
+               }
+
+               // eg if replaceSide == Side.LEFT
+               //    binexpop
+               //   /   \ \ 
+               // child (this is the childExpr argument send in)
+               //  /  \
+               // Lt   Rt 
+               //
+               // gets converted to 
+               //  binexpop
+               //  /
+               // Rt
+
+               if( !( childExpr instanceof BinaryExpression ) ) {
+                       throwException();
+               }
+               // child's lhs operand
+               LogicalExpression leftChild = 
+                       ((BinaryExpression)childExpr).getLhs();
+               // child's rhs operand
+               LogicalExpression rightChild = 
+                       ((BinaryExpression)childExpr).getRhs();
+
+               plan.disconnect( childExpr, leftChild );
+               plan.disconnect( childExpr, rightChild );
+
+               if(replaceSide == Side.LEFT) {
+                       // remove left child and replace childExpr with its 
right child
+                       remove( leftChild );
+                       replace(childExpr, rightChild);
+               } else if(replaceSide == Side.RIGHT){
+                       // remove right child and replace childExpr with its 
left child
+                       remove(rightChild);
+                       replace(childExpr, leftChild);
+               }else {
+                       throwException();
+               }
+               //reset 
+               replaceSide = Side.NONE;
+               sawKey = false;
+
+       }
+       
+       private void replace(Operator oldOp, Operator newOp) throws 
FrontendException {
+               List<Operator> grandParents = plan.getPredecessors( oldOp );
+               if( grandParents == null || grandParents.size() == 0 ) {
+                       plan.remove( oldOp );
+                       return;
+               }
+               Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
+               Pair<Integer, Integer> pair = plan.disconnect( grandParent, 
oldOp );
+               plan.add( newOp );
+               plan.connect( grandParent, pair.first, newOp, pair.second );
+               plan.remove( oldOp );
+       }
+
+       /**
+        * @param op
+        * @throws IOException 
+        * @throws IOException 
+        * @throws IOException 
+        */
+       private void remove(LogicalExpression op) throws FrontendException {
+               pColConditions.add( getExpression( op ) );
+               removeTree( op );
+       }
+       
+       /**
+        * Assume that the given operator is already disconnected from its 
predecessors.
+        * @param op
+        * @throws FrontendException 
+        */
+       private void removeTree(Operator op) throws FrontendException {
+               List<Operator> succs = plan.getSuccessors( op );
+               if( succs == null ) {
+                       plan.remove( op );
+                       return;
+               }
+               
+               Operator[] children = new Operator[succs.size()];
+               for( int i = 0; i < succs.size(); i++ ) {
+                       children[i] = succs.get(i);
+               }
+               
+               for( Operator succ : children ) {
+                       plan.disconnect( op, succ );
+                       removeTree( succ );
+               }
+               
+               plan.remove( op );
+       }
+
+       public static Expression getExpression(LogicalExpression op) throws 
FrontendException
+        {
+               if(op instanceof ConstantExpression) {
+                       ConstantExpression constExpr =(ConstantExpression)op ;
+                       return new Expression.Const( constExpr.getValue() );
+               } else if (op instanceof ProjectExpression) {
+                       ProjectExpression projExpr = (ProjectExpression)op;
+                       String fieldName = projExpr.getFieldSchema().alias;
+            return new Expression.Column(fieldName);
+        } else {
+                       if( !( op instanceof BinaryExpression ) ) {
+                               throwException();
+                       }
+                       BinaryExpression binOp = (BinaryExpression)op;
+                       if(binOp instanceof AddExpression) {
+                               return getExpression( binOp, OpType.OP_PLUS );
+                       } else if(binOp instanceof SubtractExpression) {
+                               return getExpression(binOp, OpType.OP_MINUS);
+                       } else if(binOp instanceof MultiplyExpression) {
+                               return getExpression(binOp, OpType.OP_TIMES);
+                       } else if(binOp instanceof DivideExpression) {
+                               return getExpression(binOp, OpType.OP_DIV);
+                       } else if(binOp instanceof ModExpression) {
+                               return getExpression(binOp, OpType.OP_MOD);
+                       } else if(binOp instanceof AndExpression) {
+                               return getExpression(binOp, OpType.OP_AND);
+                       } else if(binOp instanceof OrExpression) {
+                               return getExpression(binOp, OpType.OP_OR);
+                       } else if(binOp instanceof EqualExpression) {
+                               return getExpression(binOp, OpType.OP_EQ);
+                       } else if(binOp instanceof NotEqualExpression) {
+                               return getExpression(binOp, OpType.OP_NE);
+                       } else if(binOp instanceof GreaterThanExpression) {
+                               return getExpression(binOp, OpType.OP_GT);
+                       } else if(binOp instanceof GreaterThanEqualExpression) {
+                               return getExpression(binOp, OpType.OP_GE);
+                       } else if(binOp instanceof LessThanExpression) {
+                               return getExpression(binOp, OpType.OP_LT);
+                       } else if(binOp instanceof LessThanEqualExpression) {
+                               return getExpression(binOp, OpType.OP_LE);
+                       } else {
+                               throwException();
+                       }
+               }
+               return null;
+       }
+
+    private static Expression getExpression(BinaryExpression binOp, OpType 
+            opType) throws FrontendException {
+        return new Expression.BinaryExpression(getExpression(binOp.getLhs())
+                ,getExpression(binOp.getRhs()), opType);
+    }
+       private static void throwException() throws FrontendException {
+               int errCode = 2209;
+               throw new FrontendException(
+                               "Internal error while processing any partition 
filter " +
+                               "conditions in the filter after the load" ,
+                               errCode,
+                               PigException.BUG
+               );
+       }
+
+       // this might get called from some visit() - in that case, delegate to
+       // the other visit()s which we have defined here 
+       private void visit(LogicalExpression op) throws FrontendException {
+               if(op instanceof ProjectExpression) {
+                       visit((ProjectExpression)op);
+               } else if (op instanceof BinaryExpression) {
+                       visit((BinaryExpression)op);
+               } else if (op instanceof CastExpression) {
+                       visit((CastExpression)op);
+               } else if (op instanceof BinCondExpression) {
+                       visit((BinCondExpression)op);
+               } else if (op instanceof UserFuncExpression) {
+                       visit((UserFuncExpression)op);
+               } else if (op instanceof IsNullExpression) {
+                       visit((IsNullExpression)op);
+               } else if( op instanceof NotExpression ) {
+                       visit( (NotExpression)op );
+               } else if( op instanceof RegexExpression ) {
+                       visit( (RegexExpression)op );
+               }
+       }
+
+       // some specific operators which are of interest to catch some
+       // unsupported scenarios
+       private void visit(CastExpression cast) throws FrontendException {
+               visit(cast.getExpression());
+       }
+
+       private void visit(NotExpression not) throws FrontendException {
+               visit(not.getExpression());   
+       }
+
+       private void visit(RegexExpression regexp) throws FrontendException {
+               visit((BinaryExpression)regexp);    
+       }
+
+       private void visit(BinCondExpression binCond) throws FrontendException {
+               visit(binCond.getCondition());
+               visit(binCond.getLhs());
+               visit(binCond.getRhs());
+       }
+
+       private void visit(UserFuncExpression udf) throws FrontendException {
+               for (LogicalExpression op : udf.getArguments()) {
+                       visit(op);
+               }
+       }
+
+       private void visit(IsNullExpression isNull) throws FrontendException {
+               visit(isNull.getExpression());
+       }
+
+}

Added: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java?rev=989251&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java
 Wed Aug 25 17:42:06 2010
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.PColFilterExtractor;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class PartitionFilterPushDown extends Rule {
+    private String[] partitionKeys;
+    
+    /**
+     * a reference to the LoadMetada implementation 
+     */
+    private LoadMetadata loadMetadata;
+
+    /**
+     * a reference to the LoadFunc implementation
+     */
+    private LoadFunc loadFunc;
+    
+    private LOLoad loLoad;
+    private LOFilter loFilter;
+    
+    /**
+     * a map between column names as reported in 
+     * {...@link LoadMetadata#getSchema(String, 
org.apache.hadoop.conf.Configuration)}
+     * and as present in {...@link LOLoad#getSchema()}. The two will be 
different 
+     * when the user has provided a schema in the load statement
+     */
+    private Map<String, String> colNameMap = new HashMap<String, String>();
+    
+    /**
+     * a map between column nameas as present in {...@link LOLoad#getSchema()} 
and
+     * as reported in 
+     * {...@link LoadMetadata#getSchema(String, 
org.apache.hadoop.conf.Configuration)}.
+     * The two will be different when the user has provided a schema in the 
+     * load statement.
+     */
+    private Map<String, String> reverseColNameMap = new HashMap<String, 
String>();
+
+    public PartitionFilterPushDown(String name) {
+        super( name, false );
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        // match each foreach.
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator load = new LOLoad (null, null, plan, null );
+        plan.add( load );
+//        LogicalRelationalOperator filter = new LOFilter( plan );
+//        plan.add( filter );
+//        plan.connect( load, filter );
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new PartitionFilterPushDownTransformer();
+    }
+    
+    public class PartitionFilterPushDownTransformer extends Transformer {
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws FrontendException {
+            loLoad = (LOLoad)matched.getSources().get(0);
+            // Match filter.
+            List<Operator> succeds = currentPlan.getSuccessors( loLoad );
+            if( succeds == null || succeds.size() == 0 || !( succeds.get(0) 
instanceof LOFilter ) )
+                return false;
+            loFilter =  (LOFilter)succeds.get(0);
+            
+            // we have to check more only if LoadFunc implements LoadMetada
+            loadFunc = loLoad.getLoadFunc();
+            if(!( loadFunc instanceof LoadMetadata ) ) {
+                return false;
+            }
+            
+            loadMetadata = (LoadMetadata)loadFunc;
+            try {
+                               partitionKeys = loadMetadata.getPartitionKeys(
+                                               
loLoad.getFileSpec().getFileName(), new Job( loLoad.getConfiguration() ) );
+                       } catch (IOException e) {
+                               throw new FrontendException( e );
+                       }
+            if( partitionKeys == null || partitionKeys.length == 0 ) {
+               return false;
+            }
+            
+//            LogicalExpressionPlan filterExpr = filter.getFilterPlan();
+            
+            // we found a load-filter pattern where the load returns partition 
keys
+            return true;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return subPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+               subPlan = new OperatorSubPlan( currentPlan );
+
+               setupColNameMaps();
+               PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
+                               loFilter.getFilterPlan(), getMappedKeys( 
partitionKeys ) );
+               pColFilterFinder.visit();
+               Expression partitionFilter = 
pColFilterFinder.getPColCondition();
+               if(partitionFilter != null) {
+                       // the column names in the filter may be the ones 
provided by
+                       // the user in the schema in the load statement - we 
may need
+                       // to replace them with partition column names as given 
by
+                       // LoadFunc.getSchema()
+                       updateMappedColNames(partitionFilter);
+                       try {
+                                       
loadMetadata.setPartitionFilter(partitionFilter);
+                               } catch (IOException e) {
+                                       throw new FrontendException( e );
+                               }
+                       if(pColFilterFinder.isFilterRemovable()) {
+                               // remove this filter from the plan  
+                               Operator from = currentPlan.getPredecessors( 
loFilter ).get( 0 );
+                               currentPlan.disconnect( from, loFilter );
+                               List<Operator> succs = 
currentPlan.getSuccessors( loFilter );
+                               if( succs != null ) {
+                                       Operator to = succs.get( 0 );
+                                       currentPlan.disconnect( loFilter, to );
+                                       currentPlan.connect( from, to );
+                               }
+                               currentPlan.remove( loFilter );
+                       }
+               }
+        }
+        
+        private void updateMappedColNames(Expression expr) {
+            if(expr instanceof BinaryExpression) {
+                updateMappedColNames(((BinaryExpression) expr).getLhs());
+                updateMappedColNames(((BinaryExpression) expr).getRhs());
+            } else if (expr instanceof Column) {
+                Column col = (Column) expr;
+                col.setName(reverseColNameMap.get(col.getName()));
+            }
+        }
+
+        /**
+         * The partition keys in the argument are as reported by 
+         * {...@link LoadMetadata#getPartitionKeys(String, 
org.apache.hadoop.conf.Configuration)}.
+         * The user may have renamed these by providing a schema with 
different names
+         * in the load statement - this method will replace the former names 
with
+         * the latter names.
+         * @param partitionKeys
+         * @return
+         */
+        private List<String> getMappedKeys(String[] partitionKeys) {
+            List<String> mappedKeys = new 
ArrayList<String>(partitionKeys.length);
+            for (int i = 0; i < partitionKeys.length; i++) {
+                mappedKeys.add(colNameMap.get(partitionKeys[i]));
+            }
+            return mappedKeys;
+        }
+
+        private void setupColNameMaps() throws FrontendException {
+            LogicalSchema loLoadSchema = loLoad.getSchema();
+            LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema();
+             for(int i = 0; i < loadFuncSchema.size(); i++) {
+                colNameMap.put(loadFuncSchema.getField(i).alias, 
+                        (i < loLoadSchema.size() ? 
loLoadSchema.getField(i).alias :
+                            loadFuncSchema.getField(i).alias));
+                
+                reverseColNameMap.put((i < loLoadSchema.size() ? 
loLoadSchema.getField(i).alias :
+                            loadFuncSchema.getField(i).alias), 
+                            loadFuncSchema.getField(i).alias);
+            }
+        }
+
+    }
+
+}

Added: 
hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=989251&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java 
Wed Aug 25 17:42:06 2010
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.PartitionFilterPushDown;
+import org.apache.pig.newplan.logical.rules.TypeCastInserter;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PColFilterExtractor;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * unit tests to test extracting partition filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system 
(.i.e.
+ * implements {...@link LoadMetadata})
+ */
+public class TestPartitionFilterPushDown {
+    static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    static LogicalPlanTester lpTester;
+    
+    @BeforeClass
+    public static void setup() throws Exception {
+        lpTester = new LogicalPlanTester(pc);
+        lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, 
age);");
+    }
+
+    @AfterClass
+    public static void tearDown() {
+    }
+
+    /**
+     * test case where there is a single expression on partition columns in 
+     * the filter expression along with an expression on non partition column
+     * @throws IOException 
+     */
+    @Test
+    public void testSimpleMixed() throws IOException {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid == 10 and name == 
'foo';");
+        test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+    }
+    
+    /**
+     * test case where filter does not contain any condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testNoPartFilter() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
+        test(lp, Arrays.asList("srcid"), null, 
+                "((age == 20) and (name == 'foo'))");
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter1() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "((srcid > 20) and (mrkt == 'us'))", null);
+        
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter2() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "(mrkt == 'us')", null);
+        
+    }
+    
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
+    public void testOnlyPartFilter3() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                    "((srcid == 20) or (mrkt == 'us'))", null);
+        
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed1() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                       "(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
+                       "name == 'foo');");
+        test(lp, Arrays.asList("srcid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == 10))", 
+                "((age < 20) and (name == 'foo'))");
+    }
+    
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed2() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
+                    "dstid == 15);");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", 
+                "(age >= 20)");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns
+     */
+    @Test
+    public void testMixed3() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "age >= 20 and  mrkt == 'us' and srcid == 10;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - this testcase also has a condition
+     * based on comparison of two partition columns
+     */
+    @Test
+    public void testMixed4() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
+                    "srcid == dstid;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and (srcid == dstid))", 
+                "((age >= 20) and (name == 'foo'))");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - 
+     * This testcase has two partition col conditions  with OR +  non parition 
+     * col conditions
+     */
+    @Test
+    public void testMixed5() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+                    "dstid == 30;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", 
+                "(name == 'foo')");
+    }
+    
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns - 
+     * This testcase has two partition col conditions  with OR +  non parition 
+     * col conditions
+     */
+    @Test
+    public void testMixed6() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 
'foo';");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", 
+                "(name == 'foo')");
+    }
+    /**
+     * test case where filter has both conditions on partition cols and non
+     * partition cols and the filter condition will be split to extract the
+     * conditions on partition columns. This testcase also tests arithmetic
+     * in partition column conditions
+     */
+    @Test
+    public void testMixedArith() throws Exception {
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
+            lpTester.buildPlan("b = filter a by " +
+                    "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
+        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", 
+                "(age != 15)");
+    }
+    
+    @Test
+    public void testNegPColConditionWithNonPCol() throws Exception {
+        // use of partition column condition and non partition column in 
+        // same condition should fail
+       org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b 
= filter a by " +
+                    "srcid > age;");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+        lp =  lpTester.buildPlan("b = filter a by " +
+                    "srcid + age == 20;");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+
+        // OR of partition column condition and non partiton col condition 
+        // should fail
+        lp = lpTester.buildPlan("b = filter a by " +
+                    "srcid > 10 or name == 'foo';");
+        negativeTest(lp, Arrays.asList("srcid"), 1111);
+    }
+    
+    @Test
+    public void testNegPColInWrongPlaces() throws Exception {
+        
+        int expectedErrCode = 1112;
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
lpTester.buildPlan("b = filter a by " +
+        "(srcid > 10 and name == 'foo') or dstid == 10;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode); 
+        
+        expectedErrCode = 1110;
+        lp = lpTester.buildPlan("b = filter a by " +
+                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+                "mrkt matches '.*us.*' and age < 15;");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+                "(int)mrkt == 10 and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", 
"mrkt"),expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt == 'us' ? age : age + 10) == 40 and name matches 
'.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt is null) and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
expectedErrCode);
+        
+        lp = lpTester.buildPlan("b = filter a by " +
+            "(mrkt is not null) and name matches '.*foo.*';");
+        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
expectedErrCode);
+    }
+    
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping1() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', 
" +
+            "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
lpTester.buildPlan("b = filter a by " +
+                       "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 
15);");
+        
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        
+        Assert.assertEquals("checking partition filter:",             
+                    "((mrkt == 'us') and (srcid == 10))",
+                    TestLoader.partFilter.toString());
+        LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (LogicalExpression)filter.getFilterPlan().getSources().get(0)).
+                toString().toLowerCase();
+        Assert.assertEquals("checking trimmed filter expression:", 
+                "((f5 >= 20) and (f3 == 15))", actual);
+    }
+    
+    private LogicalPlan 
migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) 
throws IOException {
+        LogicalPlan newLogicalPlan = migratePlan( plan );
+        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+        optimizer.optimize();
+        return newLogicalPlan;
+    }
+    
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case there is no condition on partition columns
+     * - so setPartitionFilter() should not be called and the filter condition
+     * should remain as is.
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping2() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', 
" +
+            "'srcid') as (f1, f2, f3, f4, f5);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
lpTester.buildPlan("b = filter a by " +
+                "f5 >= 20 and f2 == 'us' and f3 == 15;");
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+
+        Assert.assertEquals("checking partition filter:",             
+                    null,
+                    TestLoader.partFilter);
+        LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (LogicalExpression) filter.getFilterPlan().
+                getSources().get(0)).
+                toString().toLowerCase();
+        Assert.assertEquals("checking trimmed filter expression:", 
+                "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
+    }
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the filter only has conditions on partition
+     * columns
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping3() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', 
" +
+            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
lpTester.buildPlan("b = filter a by " +
+                "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+
+        Assert.assertEquals("checking partition filter:",             
+                    "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " 
+
+                    "(dstid == 15)))",
+                    TestLoader.partFilter.toString());
+        Iterator<Operator> it = newLogicalPlan.getOperators();
+        Assert.assertTrue("Checking that filter has been removed since it 
contained" +
+                       " only conditions on partition cols:", 
+                       (it.next() instanceof LOLoad));
+        Assert.assertFalse("Checking that filter has been removed since it 
contained" +
+                " only conditions on partition cols:", 
+                it.hasNext());
+        
+    }
+    
+    /**
+     * Test that pig sends correct partition column names in setPartitionFilter
+     * when the user has a schema in the load statement which renames partition
+     * columns - in this test case the schema in load statement is a prefix 
+     * (with columns renamed) of the schema returned by 
+     * {...@link LoadMetadata#getSchema(String, Configuration)}
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping4() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', 
" +
+            "'srcid,mrkt') as (f1, f2, f3);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
lpTester.buildPlan("b = filter a by " +
+                "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        
+        Assert.assertEquals("checking partition filter:",             
+                    "((mrkt == 'us') and (srcid == 10))",
+                    TestLoader.partFilter.toString());
+        LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0);
+        String actual = PColFilterExtractor.getExpression(
+                (LogicalExpression) 
filter.getFilterPlan().getSources().get(0)).
+                toString().toLowerCase();
+        Assert.assertEquals("checking trimmed filter expression:", 
+                "((age >= 20) and (f3 == 15))", actual);
+    }
+    
+    /**
+     * Test PIG-1267
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping5() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, 
bcookie:chararray', " +
+            "'srcid');");
+        lpTester.buildPlan("b = load 'bar' using "
+                + TestLoader.class.getName() + 
+                "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, 
mrkt:chararray'," +
+                "'srcid');");
+        lpTester.buildPlan("a1 = filter a by srcid == 10;");
+        lpTester.buildPlan("b1 = filter b by srcid == 20;");
+        lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester
+                .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
+                               "$5 as dstid:int, $0 as mrkt:chararray;");
+        
+        new PlanSetter(lp).visit();
+        
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        
+        String partFilter = TestLoader.partFilter.toString();
+        Assert.assertTrue( "(srcid == 20)".equals( partFilter ) ||  "(srcid == 
10)".equals( partFilter ) );
+        
+        int counter = 0;
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+                Assert.assertTrue(!(iter.next() instanceof LOFilter));
+            counter++;
+        }      
+        Assert.assertEquals(counter, 4);
+    }
+    
+    //// helper methods ///////
+    
+    private PColFilterExtractor 
test(org.apache.pig.impl.logicalLayer.LogicalPlan lp, List<String> 
partitionCols, 
+            String expPartFilterString, String expFilterString) 
+    throws IOException {
+       LogicalPlan newLogicalPlan = migratePlan( lp );
+        LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        pColExtractor.visit();
+        
+        if(expPartFilterString == null) {
+                Assert.assertEquals("Checking partition column filter:", null, 
+                    pColExtractor.getPColCondition());
+        } else  {
+                Assert.assertEquals("Checking partition column filter:", 
+                    expPartFilterString.toLowerCase(), 
+                    
pColExtractor.getPColCondition().toString().toLowerCase());   
+        }
+        
+        if(expFilterString == null) {
+                Assert.assertTrue("Check that filter can be removed:", 
+                    pColExtractor.isFilterRemovable());
+        } else {
+            String actual = PColFilterExtractor.getExpression(
+                                
(LogicalExpression)filter.getFilterPlan().getSources().get(0)).
+                                toString().toLowerCase();
+            Assert.assertEquals("checking trimmed filter expression:", 
expFilterString,
+                    actual);
+        }
+        return pColExtractor;
+    }
+    
+    private void negativeTest(org.apache.pig.impl.logicalLayer.LogicalPlan lp, 
List<String> partitionCols,
+            int expectedErrorCode) throws VisitorException {
+       LogicalPlan newLogicalPlan = migratePlan( lp );
+        LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        try {
+            pColExtractor.visit();
+        } catch(Exception e) {
+                Assert.assertEquals("Checking if exception has right error 
code", 
+                    expectedErrorCode, 
LogUtils.getPigException(e).getErrorCode());
+            return;
+        }
+        Assert.fail("Exception expected!");
+    }
+    
+    /**
+     * this loader is only used to test that parition column filters are given
+     * in the manner expected in terms of column names - hence it does not
+     * implement many of the methods and only implements required ones.
+     */
+    public static class TestLoader extends LoadFunc implements LoadMetadata {
+
+        Schema schema;
+        String[] partCols;
+        static Expression partFilter = null;
+        
+        public TestLoader(String schemaString, String commaSepPartitionCols) 
+        throws ParseException {
+            schema = Util.getSchemaFromString(schemaString);
+            partCols = commaSepPartitionCols.split(",");
+        }
+        
+        @Override
+        public InputFormat getInputFormat() throws IOException {
+            return null;
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            return null;
+        }
+
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split)
+                throws IOException {
+        }
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+        }
+
+        @Override
+        public String[] getPartitionKeys(String location, Job job)
+                throws IOException {
+            return partCols;
+        }
+
+        @Override
+        public ResourceSchema getSchema(String location, Job job)
+                throws IOException {
+            return new ResourceSchema(schema);
+        }
+
+        @Override
+        public ResourceStatistics getStatistics(String location,
+                Job job) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void setPartitionFilter(Expression partitionFilter)
+                throws IOException {
+            partFilter = partitionFilter;            
+        }
+        
+    }
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super( p, iterations, new HashSet<String>() );
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Set<Rule> s = new HashSet<Rule>();
+            // add split filter rule
+            Rule r = new PartitionFilterPushDown("PartitionFilterPushDown");
+            s = new HashSet<Rule>();
+            s.add(r);            
+            ls.add(s);
+            
+            r = new TypeCastInserter( "TypeCastInserter", 
LOLoad.class.getName() );
+            s = new HashSet<Rule>();
+            s.add(r);
+            ls.add(s);
+            return ls;
+        }
+    }    
+
+    private LogicalPlan 
migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws 
VisitorException{
+        LogicalPlanMigrationVistor visitor = new 
LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        return newPlan;
+    }
+    
+}


Reply via email to