Author: daijy Date: Wed Aug 25 18:03:34 2010 New Revision: 989272 URL: http://svn.apache.org/viewvc?rev=989272&view=rev Log: File renaming for PIG-1514 and PIG-1497
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Removed: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/OptimizeLimit.java hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterPushDown.java Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=989272&r1=989271&r2=989272&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Aug 25 18:03:34 2010 @@ -23,24 +23,20 @@ import java.util.List; import java.util.Set; import org.apache.pig.newplan.OperatorPlan; -import org.apache.pig.newplan.logical.relational.LOLoad; -import org.apache.pig.newplan.logical.relational.LOStream; import org.apache.pig.newplan.logical.rules.AddForEach; import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune; import org.apache.pig.newplan.logical.rules.FilterAboveForeach; import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter; import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; import org.apache.pig.newplan.logical.rules.MergeFilter; -import org.apache.pig.newplan.logical.rules.OptimizeLimit; import org.apache.pig.newplan.logical.rules.PushUpFilter; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter; -import org.apache.pig.newplan.logical.rules.TypeCastInserter; import org.apache.pig.newplan.optimizer.PlanOptimizer; import org.apache.pig.newplan.optimizer.Rule; -import org.apache.pig.newplan.logical.rules.OptimizeLimit; -import org.apache.pig.newplan.logical.rules.PartitionFilterPushDown; +import org.apache.pig.newplan.logical.rules.LimitOptimizer; +import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer; public class LogicalPlanOptimizer extends PlanOptimizer { private Set<String> mRulesOff = null; @@ -79,7 +75,7 @@ public class LogicalPlanOptimizer extend // This set of rules push partition filter to LoadFunc s = new HashSet<Rule>(); // Optimize partition filter - r = new PartitionFilterPushDown("PartitionFilterPushDown"); + r = new PartitionFilterOptimizer("PartitionFilterOptimizer"); checkAndAddRule(s, r); if (!s.isEmpty()) ls.add(s); @@ -88,7 +84,7 @@ public class LogicalPlanOptimizer extend // This set of rules push up limit s = new HashSet<Rule>(); // Optimize limit - r = new OptimizeLimit("OptimizeLimit"); + r = new LimitOptimizer("LimitOptimizer"); checkAndAddRule(s, r); if (!s.isEmpty()) ls.add(s); Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=989272&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Wed Aug 25 18:03:34 2010 @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.rules; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.relational.LOCogroup; +import org.apache.pig.newplan.logical.relational.LOCross; +import org.apache.pig.newplan.logical.relational.LODistinct; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOForEach; +import org.apache.pig.newplan.logical.relational.LOGenerate; +import org.apache.pig.newplan.logical.relational.LOJoin; +import org.apache.pig.newplan.logical.relational.LOLimit; +import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LOSort; +import org.apache.pig.newplan.logical.relational.LOSplit; +import org.apache.pig.newplan.logical.relational.LOSplitOutput; +import org.apache.pig.newplan.logical.relational.LOUnion; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.newplan.optimizer.Transformer; + +public class LimitOptimizer extends Rule { + + public LimitOptimizer(String name) { + super(name, false); + } + + @Override + protected OperatorPlan buildPattern() { + LogicalPlan plan = new LogicalPlan(); + LogicalRelationalOperator limit = new LOLimit(plan, 0); + plan.add(limit); + return plan; + } + + @Override + public Transformer getNewTransformer() { + return new OptimizeLimitTransformer(); + } + + public class OptimizeLimitTransformer extends Transformer { + + @Override + public boolean check(OperatorPlan matched) { + LOLimit limit = (LOLimit) matched.getSources().get(0); + // Match each foreach. + List<Operator> preds = currentPlan.getPredecessors(limit); + if (preds == null || preds.size() == 0) + return false; + + Operator pred = preds.get(0); + + // Limit cannot be pushed up + if (pred instanceof LOCogroup || pred instanceof LOFilter + || pred instanceof LOLoad || pred instanceof LOSplit + || pred instanceof LODistinct || pred instanceof LOJoin) { + return false; + } + + // Limit cannot be pushed in front of ForEach if it has a flatten + if (pred instanceof LOForEach) { + LOForEach foreach = (LOForEach) pred; + LogicalPlan innerPlan = foreach.getInnerPlan(); + Iterator<Operator> it = innerPlan.getOperators(); + while (it.hasNext()) { + Operator op = it.next(); + if (op instanceof LOGenerate) { + LOGenerate gen = (LOGenerate) op; + boolean[] flattenFlags = gen.getFlattenFlags(); + if (flattenFlags != null) { + for (boolean flatten : flattenFlags) { + if (flatten) + return false; + } + } + } + } + } + return true; + } + + @Override + public OperatorPlan reportChanges() { + return currentPlan; + } + + @Override + public void transform(OperatorPlan matched) throws FrontendException { + + LOLimit limit = (LOLimit) matched.getSources().get(0); + + // Find the next foreach operator. + List<Operator> preds = currentPlan.getPredecessors(limit); + Operator pred = preds.get(0); + + if (pred instanceof LOForEach) { + // We can safely move LOLimit up + // Get operator before LOFilter + Operator prepredecessor = currentPlan.getPredecessors(pred) + .get(0); + Operator succ = currentPlan.getSuccessors(limit).get(0); + currentPlan.disconnect(prepredecessor, pred); + currentPlan.disconnect(pred, limit); + currentPlan.disconnect(limit, succ); + currentPlan.connect(prepredecessor, limit); + currentPlan.connect(limit, pred); + currentPlan.connect(pred, succ); + } else if (pred instanceof LOCross || pred instanceof LOUnion) { + // Limit can be duplicated, and the new instance pushed in front + // of an operator for the following operators + // (that is, if you have X->limit, you can transform that to + // limit->X->limit): + LOLimit newLimit = null; + List<Operator> nodesToProcess = new ArrayList<Operator>(); + for (Operator prepredecessor : currentPlan + .getPredecessors(pred)) + nodesToProcess.add(prepredecessor); + for (Operator prepredecessor : nodesToProcess) { + if (prepredecessor instanceof LOLimit) { + LOLimit l = (LOLimit) prepredecessor; + l.setLimit(l.getLimit() < limit.getLimit() ? l + .getLimit() : limit.getLimit()); + } else { + newLimit = new LOLimit((LogicalPlan) currentPlan, limit + .getLimit()); + currentPlan.add(newLimit); + currentPlan.disconnect(prepredecessor, pred); + currentPlan.connect(prepredecessor, newLimit); + currentPlan.connect(newLimit, pred); + } + } + } else if (pred instanceof LOSort) { + LOSort sort = (LOSort) pred; + if (sort.getLimit() == -1) + sort.setLimit(limit.getLimit()); + else + sort.setLimit(sort.getLimit() < limit.getLimit() ? sort + .getLimit() : limit.getLimit()); + + // remove the limit + Operator succ = currentPlan.getSuccessors(limit).get(0); + currentPlan.disconnect(sort, limit); + currentPlan.disconnect(limit, succ); + currentPlan.connect(sort, succ); + currentPlan.remove(limit); + } else if (pred instanceof LOLimit) { + // Limit is merged into another LOLimit + LOLimit beforeLimit = (LOLimit) pred; + beforeLimit + .setLimit(beforeLimit.getLimit() < limit.getLimit() ? beforeLimit + .getLimit() + : limit.getLimit()); + // remove the limit + Operator succ = currentPlan.getSuccessors(limit).get(0); + currentPlan.disconnect(beforeLimit, limit); + currentPlan.disconnect(limit, succ); + currentPlan.connect(beforeLimit, succ); + currentPlan.remove(limit); + } else if (pred instanceof LOSplitOutput) { + // Limit and OrderBy (LOSort) can be separated by split + List<Operator> grandparants = currentPlan.getPredecessors(pred); + // After insertion of splitters, any node in the plan can + // have at most one predecessor + if (grandparants != null && grandparants.size() != 0 + && grandparants.get(0) instanceof LOSplit) { + List<Operator> greatGrandparants = currentPlan + .getPredecessors(grandparants.get(0)); + if (greatGrandparants != null + && greatGrandparants.size() != 0 + && greatGrandparants.get(0) instanceof LOSort) { + LOSort sort = (LOSort) greatGrandparants.get(0); + LOSort newSort = new LOSort(sort.getPlan(), sort + .getSortColPlans(), sort.getAscendingCols(), + sort.getUserFunc()); + newSort.setLimit(limit.getLimit()); + + Operator succ = currentPlan.getSuccessors(limit).get(0); + currentPlan.disconnect(pred, limit); + currentPlan.disconnect(limit, succ); + currentPlan.add(newSort); + currentPlan.connect(pred, newSort); + currentPlan.connect(newSort, succ); + currentPlan.remove(limit); + } + } + } + } + } +} Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=989272&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Wed Aug 25 18:03:34 2010 @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.newplan.logical.rules; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.Expression; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; +import org.apache.pig.Expression.BinaryExpression; +import org.apache.pig.Expression.Column; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LOLoad; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.OperatorSubPlan; +import org.apache.pig.newplan.PColFilterExtractor; +import org.apache.pig.newplan.optimizer.Rule; +import org.apache.pig.newplan.optimizer.Transformer; +import org.apache.pig.impl.logicalLayer.FrontendException; + +public class PartitionFilterOptimizer extends Rule { + private String[] partitionKeys; + + /** + * a reference to the LoadMetada implementation + */ + private LoadMetadata loadMetadata; + + /** + * a reference to the LoadFunc implementation + */ + private LoadFunc loadFunc; + + private LOLoad loLoad; + private LOFilter loFilter; + + /** + * a map between column names as reported in + * {...@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)} + * and as present in {...@link LOLoad#getSchema()}. The two will be different + * when the user has provided a schema in the load statement + */ + private Map<String, String> colNameMap = new HashMap<String, String>(); + + /** + * a map between column nameas as present in {...@link LOLoad#getSchema()} and + * as reported in + * {...@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}. + * The two will be different when the user has provided a schema in the + * load statement. + */ + private Map<String, String> reverseColNameMap = new HashMap<String, String>(); + + public PartitionFilterOptimizer(String name) { + super( name, false ); + } + + @Override + protected OperatorPlan buildPattern() { + // match each foreach. + LogicalPlan plan = new LogicalPlan(); + LogicalRelationalOperator load = new LOLoad (null, null, plan, null ); + plan.add( load ); +// LogicalRelationalOperator filter = new LOFilter( plan ); +// plan.add( filter ); +// plan.connect( load, filter ); + return plan; + } + + @Override + public Transformer getNewTransformer() { + return new PartitionFilterPushDownTransformer(); + } + + public class PartitionFilterPushDownTransformer extends Transformer { + private OperatorSubPlan subPlan; + + @Override + public boolean check(OperatorPlan matched) throws FrontendException { + loLoad = (LOLoad)matched.getSources().get(0); + // Match filter. + List<Operator> succeds = currentPlan.getSuccessors( loLoad ); + if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) ) + return false; + loFilter = (LOFilter)succeds.get(0); + + // we have to check more only if LoadFunc implements LoadMetada + loadFunc = loLoad.getLoadFunc(); + if(!( loadFunc instanceof LoadMetadata ) ) { + return false; + } + + loadMetadata = (LoadMetadata)loadFunc; + try { + partitionKeys = loadMetadata.getPartitionKeys( + loLoad.getFileSpec().getFileName(), new Job( loLoad.getConfiguration() ) ); + } catch (IOException e) { + throw new FrontendException( e ); + } + if( partitionKeys == null || partitionKeys.length == 0 ) { + return false; + } + +// LogicalExpressionPlan filterExpr = filter.getFilterPlan(); + + // we found a load-filter pattern where the load returns partition keys + return true; + } + + @Override + public OperatorPlan reportChanges() { + return subPlan; + } + + @Override + public void transform(OperatorPlan matched) throws FrontendException { + subPlan = new OperatorSubPlan( currentPlan ); + + setupColNameMaps(); + PColFilterExtractor pColFilterFinder = new PColFilterExtractor( + loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) ); + pColFilterFinder.visit(); + Expression partitionFilter = pColFilterFinder.getPColCondition(); + if(partitionFilter != null) { + // the column names in the filter may be the ones provided by + // the user in the schema in the load statement - we may need + // to replace them with partition column names as given by + // LoadFunc.getSchema() + updateMappedColNames(partitionFilter); + try { + loadMetadata.setPartitionFilter(partitionFilter); + } catch (IOException e) { + throw new FrontendException( e ); + } + if(pColFilterFinder.isFilterRemovable()) { + // remove this filter from the plan + Operator from = currentPlan.getPredecessors( loFilter ).get( 0 ); + currentPlan.disconnect( from, loFilter ); + List<Operator> succs = currentPlan.getSuccessors( loFilter ); + if( succs != null ) { + Operator to = succs.get( 0 ); + currentPlan.disconnect( loFilter, to ); + currentPlan.connect( from, to ); + } + currentPlan.remove( loFilter ); + } + } + } + + private void updateMappedColNames(Expression expr) { + if(expr instanceof BinaryExpression) { + updateMappedColNames(((BinaryExpression) expr).getLhs()); + updateMappedColNames(((BinaryExpression) expr).getRhs()); + } else if (expr instanceof Column) { + Column col = (Column) expr; + col.setName(reverseColNameMap.get(col.getName())); + } + } + + /** + * The partition keys in the argument are as reported by + * {...@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}. + * The user may have renamed these by providing a schema with different names + * in the load statement - this method will replace the former names with + * the latter names. + * @param partitionKeys + * @return + */ + private List<String> getMappedKeys(String[] partitionKeys) { + List<String> mappedKeys = new ArrayList<String>(partitionKeys.length); + for (int i = 0; i < partitionKeys.length; i++) { + mappedKeys.add(colNameMap.get(partitionKeys[i])); + } + return mappedKeys; + } + + private void setupColNameMaps() throws FrontendException { + LogicalSchema loLoadSchema = loLoad.getSchema(); + LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema(); + for(int i = 0; i < loadFuncSchema.size(); i++) { + colNameMap.put(loadFuncSchema.getField(i).alias, + (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : + loadFuncSchema.getField(i).alias)); + + reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : + loadFuncSchema.getField(i).alias), + loadFuncSchema.getField(i).alias); + } + } + + } + +} Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=989272&r1=989271&r2=989272&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Wed Aug 25 18:03:34 2010 @@ -27,7 +27,7 @@ import org.apache.pig.newplan.logical.Lo import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; -import org.apache.pig.newplan.logical.rules.OptimizeLimit; +import org.apache.pig.newplan.logical.rules.LimitOptimizer; import org.apache.pig.newplan.OperatorPlan; import org.apache.pig.newplan.optimizer.PlanOptimizer; import org.apache.pig.newplan.optimizer.Rule; @@ -250,7 +250,7 @@ public class TestOptimizeLimit { ls.add(s); s = new HashSet<Rule>(); - r = new OptimizeLimit("OptimizeLimit"); + r = new LimitOptimizer("OptimizeLimit"); s.add(r); ls.add(s); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=989272&r1=989271&r2=989272&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Wed Aug 25 18:03:34 2010 @@ -45,7 +45,7 @@ import org.apache.pig.newplan.logical.op import org.apache.pig.newplan.logical.relational.LOFilter; import org.apache.pig.newplan.logical.relational.LOLoad; import org.apache.pig.newplan.logical.relational.LogicalPlan; -import org.apache.pig.newplan.logical.rules.PartitionFilterPushDown; +import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer; import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter; import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; @@ -605,7 +605,7 @@ public class TestPartitionFilterPushDown Set<Rule> s = new HashSet<Rule>(); // add split filter rule - Rule r = new PartitionFilterPushDown("PartitionFilterPushDown"); + Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown"); s = new HashSet<Rule>(); s.add(r); ls.add(s);