Author: gates Date: Wed Sep 1 20:35:25 2010 New Revision: 991695 URL: http://svn.apache.org/viewvc?rev=991695&view=rev Log: PIG-1399: Filter expression optimizations.
Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ConstExpEvaluator.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFExpression.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlan.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlanGenerator.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionProxy.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionSimplifier.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/NotConversionVisitor.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFilterSimplification.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFilterOpNumeric.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=991695&r1=991694&r2=991695&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Wed Sep 1 20:35:25 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1399: Filter expression optimizations (yanz via gates) + PIG-1531: Pig gobbles up error messages (nrai via hashutosh) PIG-1458: aggregate files for replicated join (rding) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java?rev=991695&r1=991694&r2=991695&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java Wed Sep 1 20:35:25 2010 @@ -99,6 +99,7 @@ public abstract class LogicalExpression uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema); } + /** * Create the deep copy of this expression and add that into the passed * LogicalExpressionPlan Return the copy of this expression with updated Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=991695&r1=991694&r2=991695&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Sep 1 20:35:25 2010 @@ -33,6 +33,7 @@ import org.apache.pig.newplan.logical.ru import org.apache.pig.newplan.logical.rules.PushUpFilter; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter; +import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier; import org.apache.pig.newplan.optimizer.PlanOptimizer; import org.apache.pig.newplan.optimizer.Rule; @@ -53,11 +54,17 @@ public class LogicalPlanOptimizer extend protected List<Set<Rule>> buildRuleSets() { List<Set<Rule>> ls = new ArrayList<Set<Rule>>(); + // Logical expression simplifier + Set<Rule> s = new HashSet<Rule>(); + // add logical expression simplification rule + Rule r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier"); + checkAndAddRule(s, r); + ls.add(s); + // ImplicitSplitInserter set // This set of rules Insert Foreach dedicated for casting after load - Set<Rule> s = new HashSet<Rule>(); - // add split filter rule - Rule r = new ImplicitSplitInserter("ImplicitSplitInserter"); + s = new HashSet<Rule>(); + r = new ImplicitSplitInserter("ImplicitSplitInserter"); checkAndAddRule(s, r); if (!s.isEmpty()) ls.add(s); @@ -186,7 +193,7 @@ public class LogicalPlanOptimizer extend ruleSet.add(rule); } - + private void addListeners() { addPlanTransformListener(new SchemaPatcher()); addPlanTransformListener(new ProjectionPatcher()); Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ConstExpEvaluator.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ConstExpEvaluator.java?rev=991695&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ConstExpEvaluator.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ConstExpEvaluator.java Wed Sep 1 20:35:25 2010 @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.rules; + +import java.util.List; +import java.util.Stack; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.ReverseDependencyOrderWalker; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.data.DataType; + +/** + * a constant expression evaluation visitor that will evaluate the constant expressions. + * It works by traversing the expression tree in a bottom-up manner and evaluate all + * subexpressions that have all constant subexpressions. All results from constant + * children are pushed to a stack for the parent to digest for its own evaluation. + * Any non-constant expression will push a null to the stack and consequently will + * cause all of its ancestors not to be evaluated. + * + * For simplicity, only constant binary expressions and constant unary expressions are + * evaluated. More complex expressions are not evaluated at this moment. For UDF, + * this evaluation is not planned at all for fear of possible stateful consequences + * resulting from the original evaluations + * + */ +class ConstExpEvaluator extends LogicalExpressionVisitor { + Stack<ConstantExpression> result; + + ConstExpEvaluator(OperatorPlan plan) throws FrontendException { + super(plan, new ReverseDependencyOrderWalker(plan)); + result = new Stack<ConstantExpression>(); + } + + @SuppressWarnings("unchecked") + private void binaryExpressionConstPrune(LogicalExpression parent) + throws FrontendException { + ConstantExpression rhs = result.pop(), lhs = result.pop(); + if (rhs == null || lhs == null) { + result.push(null); + } + else { + ConstantExpression newExp = null; + if (parent instanceof AndExpression) newExp = new ConstantExpression( + plan, + (Boolean) rhs.getValue() && (Boolean) lhs.getValue(), + parent.getFieldSchema()); + else if (parent instanceof OrExpression) newExp = new ConstantExpression( + plan, + (Boolean) rhs.getValue() || (Boolean) lhs.getValue(), + parent.getFieldSchema()); + else if (parent instanceof EqualExpression) newExp = new ConstantExpression( + plan, rhs.isEqual(lhs), + parent.getFieldSchema()); + else if (parent instanceof GreaterThanExpression) newExp = new ConstantExpression( + plan, + ((Comparable) rhs.getValue()).compareTo((Comparable) lhs.getValue()) > 0, + parent.getFieldSchema()); + else if (parent instanceof GreaterThanEqualExpression) newExp = new ConstantExpression( + plan, + ((Comparable) rhs.getValue()).compareTo((Comparable) lhs.getValue()) >= 0, + parent.getFieldSchema()); + else if (parent instanceof LessThanExpression) newExp = new ConstantExpression( + plan, + ((Comparable) rhs.getValue()).compareTo((Comparable) lhs.getValue()) < 0, + parent.getFieldSchema()); + else if (parent instanceof LessThanExpression) newExp = new ConstantExpression( + plan, + ((Comparable) rhs.getValue()).compareTo((Comparable) lhs.getValue()) <= 0, + parent.getFieldSchema()); + else if (parent instanceof AddExpression) { + byte type = parent.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + newExp = new ConstantExpression( + plan, + (Integer) lhs.getValue() + (Integer) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.LONG: + newExp = new ConstantExpression( + plan, + (Long) lhs.getValue() + (Long) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.FLOAT: + newExp = new ConstantExpression( + plan, + (Float) lhs.getValue() + (Float) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.DOUBLE: + newExp = new ConstantExpression( + plan, + (Double) lhs.getValue() + (Double) rhs.getValue(), + parent.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else if (parent instanceof SubtractExpression) { + byte type = parent.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + newExp = new ConstantExpression( + plan, + (Integer) lhs.getValue() - (Integer) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.LONG: + newExp = new ConstantExpression( + plan, + (Long) lhs.getValue() - (Long) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.FLOAT: + newExp = new ConstantExpression( + plan, + (Float) lhs.getValue() - (Float) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.DOUBLE: + newExp = new ConstantExpression( + plan, + (Double) lhs.getValue() - (Double) rhs.getValue(), + parent.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else if (parent instanceof MultiplyExpression) { + byte type = parent.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + newExp = new ConstantExpression( + plan, + (Integer) lhs.getValue() * (Integer) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.LONG: + newExp = new ConstantExpression( + plan, + (Long) lhs.getValue() * (Long) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.FLOAT: + newExp = new ConstantExpression( + plan, + (Float) lhs.getValue() * (Float) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.DOUBLE: + newExp = new ConstantExpression( + plan, + (Double) lhs.getValue() * (Double) rhs.getValue(), + parent.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else if (parent instanceof ModExpression) { + byte type = parent.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + newExp = new ConstantExpression( + plan, + (Integer) lhs.getValue() % (Integer) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.LONG: + newExp = new ConstantExpression( + plan, + (Long) lhs.getValue() % (Long) rhs.getValue(), + parent.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else if (parent instanceof DivideExpression) { + byte type = parent.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + if ((Integer) rhs.getValue() != 0) + newExp = new ConstantExpression( + plan, + (Integer) lhs.getValue() / (Integer) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.LONG: + if ((Long) rhs.getValue() != 0) + newExp = new ConstantExpression( + plan, + (Long) lhs.getValue() / (Long) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.FLOAT: + if ((Float) rhs.getValue() != 0) + newExp = new ConstantExpression( + plan, + (Float) lhs.getValue() / (Float) rhs.getValue(), + parent.getFieldSchema()); + break; + case DataType.DOUBLE: + if ((Double) rhs.getValue() != 0) + newExp = new ConstantExpression( + plan, + (Double) lhs.getValue() / (Double) rhs.getValue(), + parent.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else throw new FrontendException("Invalid instance type."); + if (newExp != null) { + plan.disconnect(parent, rhs); + plan.remove(rhs); + plan.disconnect(parent, lhs); + plan.remove(lhs); + plan.add(newExp); + List<Operator> predList = plan.getPredecessors(parent); + if (predList != null) { + Operator[] preds = predList.toArray(new Operator[0]); + for (Object p : preds) { + Operator pred = (Operator) p; + plan.disconnect(pred, parent); + plan.connect(pred, newExp); + } + } + plan.remove(parent); + result.push(newExp); + } + else result.push(null); + } + } + + private void unaryExpressionConstPrune(LogicalExpression op) + throws FrontendException { + ConstantExpression operand = result.pop(); + if (operand == null) { + result.push(null); + return; + } + if (op instanceof CastExpression) { + // no simplification on cast for now + result.push(null); + } + else { + plan.remove(operand); + plan.remove(op); + ConstantExpression newExp; + if (op instanceof NotExpression) newExp = new ConstantExpression( + plan, !((Boolean) operand.getValue()), + op.getFieldSchema()); + else if (op instanceof IsNullExpression) newExp = new ConstantExpression( + plan, operand.getValue() == null, + op.getFieldSchema()); + else if (op instanceof NegativeExpression) { + byte type = operand.getFieldSchema().type; + switch (type) { + case DataType.INTEGER: + newExp = new ConstantExpression( + plan, + -1 * ((Integer) operand.getValue()), + op.getFieldSchema()); + break; + case DataType.LONG: + newExp = new ConstantExpression( + plan, + -1L * ((Long) operand.getValue()), + op.getFieldSchema()); + break; + case DataType.FLOAT: + newExp = new ConstantExpression( + plan, + -1.0F * ((Float) operand.getValue()), + op.getFieldSchema()); + break; + case DataType.DOUBLE: + newExp = new ConstantExpression( + plan, + -1.0D * ((Integer) operand.getValue()), + op.getFieldSchema()); + break; + default: + throw new FrontendException("Invalid type"); + } + } + else throw new FrontendException("Invalid instance type."); + plan.add(newExp); + result.push(newExp); + } + } + + private void noConstPrune(LogicalExpression op) + throws FrontendException { + // universal handler of an expression that can not or currently does not prune constant + // sub expressions after pre-calculation + List<Operator> sucs = op.getPlan().getSuccessors(op); + int nSucs = (sucs == null ? 0 : sucs.size()); + for (int i = 0; i < nSucs; i++) + result.pop(); + result.push(null); + } + + @Override + public void visit(ConstantExpression constant) + throws FrontendException { + result.push(constant); + } + + @Override + public void visit(AndExpression andExpr) throws FrontendException { + binaryExpressionConstPrune(andExpr); + } + + @Override + public void visit(OrExpression orExpr) throws FrontendException { + binaryExpressionConstPrune(orExpr); + } + + @Override + public void visit(EqualExpression equal) throws FrontendException { + binaryExpressionConstPrune(equal); + } + + @Override + public void visit(ProjectExpression project) + throws FrontendException { + noConstPrune(project); + } + + @Override + public void visit(CastExpression cast) throws FrontendException { + unaryExpressionConstPrune(cast); + } + + @Override + public void visit(GreaterThanExpression greaterThanExpression) + throws FrontendException { + binaryExpressionConstPrune(greaterThanExpression); + } + + @Override + public void visit(NotExpression op) throws FrontendException { + unaryExpressionConstPrune(op); + } + + @Override + public void visit(IsNullExpression op) throws FrontendException { + unaryExpressionConstPrune(op); + } + + @Override + public void visit(NegativeExpression op) throws FrontendException { + unaryExpressionConstPrune(op); + } + + @Override + public void visit(AddExpression op) throws FrontendException { + binaryExpressionConstPrune(op); + } + + @Override + public void visit(SubtractExpression op) throws FrontendException { + binaryExpressionConstPrune(op); + } + + @Override + public void visit(ModExpression op) throws FrontendException { + binaryExpressionConstPrune(op); + } + + @Override + public void visit(MultiplyExpression op) throws FrontendException { + binaryExpressionConstPrune(op); + } + + @Override + public void visit(DivideExpression op) throws FrontendException { + binaryExpressionConstPrune(op); + } + + @Override + public void visit(MapLookupExpression op) throws FrontendException { + noConstPrune(op); + } + + @Override + public void visit(BinCondExpression op) throws FrontendException { + noConstPrune(op); + } + + @Override + public void visit(UserFuncExpression op) throws FrontendException { + noConstPrune(op); + } + + @Override + public void visit(DereferenceExpression bagDerefenceExpression) + throws FrontendException { + noConstPrune(bagDerefenceExpression); + } + + @Override + public void visit(RegexExpression op) throws FrontendException { + noConstPrune(op); + } +} Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFExpression.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFExpression.java?rev=991695&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFExpression.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFExpression.java Wed Sep 1 20:35:25 2010 @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.rules; +import java.util.List; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanVisitor; +import org.apache.pig.data.DataType; + +/** + * + * A boolean expression with multiple children vs. the usual binary + * boolean expression. + * + */ +class DNFExpression extends LogicalExpression { + enum DNFExpressionType { + AND, OR + } + final DNFExpressionType type; + + DNFExpression(String name, OperatorPlan plan, DNFExpressionType type) { + super(name, plan); + this.type = type; + } + + @Override + public void accept(PlanVisitor v) throws FrontendException { + throw new FrontendException( + "DNF expression does not accept any visitor"); + } + + @Override + public boolean isEqual(Operator other) { + if (other != null && other instanceof DNFExpression) { + DNFExpression otherexp = (DNFExpression) other; + if (type != otherexp.type) return false; + + try { + // this equality check is too strong: it does not allow for permutations; + // should be improved in the future + List<Operator> thisChildren = plan.getSuccessors(this), thatChildren = plan.getSuccessors(otherexp); + if (thisChildren == null && thatChildren == null) return true; + else if (thisChildren == null || thatChildren == null) return false; + else { + if (thisChildren.size() != thatChildren.size()) + return false; + for (int i = 0; i < thisChildren.size(); i++) { + if (!thisChildren.get(i).isEqual( + thatChildren.get(i))) + return false; + } + } + } + catch (FrontendException e) { + throw new RuntimeException(e); + } + return true; + } + else { + return false; + } + } + + @Override + public LogicalSchema.LogicalFieldSchema getFieldSchema() + throws FrontendException { + if (fieldSchema != null) return fieldSchema; + fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, + DataType.BOOLEAN); + fieldSchema.stampFieldSchema(); + return fieldSchema; + } + + @Override + public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException { + throw new FrontendException("Deepcopy not expected"); + } +} Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlan.java?rev=991695&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlan.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlan.java Wed Sep 1 20:35:25 2010 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.pig.newplan.logical.rules; + +import java.io.PrintStream; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.Operator; + +/** + * A plan for DNF boolean tree so the size can be controlled. + * + */ +class DNFPlan extends LogicalExpressionPlan { + private static final int sizeLimit = 100; + + @Override + public void explain(PrintStream ps, String format, boolean verbose) + throws FrontendException { + ps.println("#-----------------------------------------------"); + ps.println("# DNF Plan:"); + ps.println("#-----------------------------------------------"); + } + + public void safeAdd(Operator op) throws FrontendException { + // addition after size limit check + if (size() + 1 > sizeLimit) + throw new FrontendException( + "DNF size limit of " + sizeLimit + " is reached"); + super.add(op); + } +} Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlanGenerator.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlanGenerator.java?rev=991695&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlanGenerator.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DNFPlanGenerator.java Wed Sep 1 20:35:25 2010 @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.rules; + +import java.util.Stack; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.ReverseDependencyOrderWalker; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; + +/** + * + * A utility class to generate a DNF plan from a base plan. + * Not that DNF plan can be generated only AFTER all NOTs + * are pushed below the OR/AND expression, namely, the + * NOTConversionVisitor should be applied before DNF can be + * generated. + * + */ +class DNFPlanGenerator extends AllSameExpressionVisitor { + private OperatorPlan dnfPlan = null; + Stack<LogicalExpression> result; + + DNFPlanGenerator(OperatorPlan plan) throws FrontendException { + super(plan, new ReverseDependencyOrderWalker(plan)); + result = new Stack<LogicalExpression>(); + } + + @Override + protected void execute(LogicalExpression op) + throws FrontendException { + if (op instanceof UnaryExpression) { + result.pop(); + } + else if (op instanceof BinaryExpression) { + result.pop(); + result.pop(); + } + else if (op instanceof BinCondExpression) { + result.pop(); + result.pop(); + result.pop(); + } + + result.push(op); + } + + OperatorPlan getDNFPlan() { + if (dnfPlan == null) dnfPlan = result.pop().getPlan(); + return dnfPlan; + } + + @Override + public void visit(AndExpression exp) throws FrontendException { + LogicalExpression rhsExp = result.pop(); + LogicalExpression lhsExp = result.pop(); + if (!(lhsExp instanceof AndExpression) && !(lhsExp instanceof DNFExpression) && !(lhsExp instanceof OrExpression) && !(rhsExp instanceof AndExpression) && !(rhsExp instanceof OrExpression) && !(rhsExp instanceof DNFExpression)) result.push(exp); + else { + if (dnfPlan == null) dnfPlan = new DNFPlan(); + boolean isLhsAnd = lhsExp instanceof AndExpression || + (lhsExp instanceof DNFExpression && + ((DNFExpression) lhsExp).type == DNFExpression.DNFExpressionType.AND), + isRhsAnd = rhsExp instanceof AndExpression || (rhsExp instanceof DNFExpression && ((DNFExpression) rhsExp).type == DNFExpression.DNFExpressionType.AND); + + LogicalExpression current; + if (isLhsAnd && isRhsAnd) { + current = new DNFExpression("dnfAnd", dnfPlan, + DNFExpression.DNFExpressionType.AND); + ((DNFPlan) dnfPlan).safeAdd(current); + result.push(current); + addChildren(current, lhsExp); + addChildren(current, rhsExp); + } + else { + boolean isLhsOr = lhsExp instanceof OrExpression || + (lhsExp instanceof DNFExpression && + ((DNFExpression) lhsExp).type == DNFExpression.DNFExpressionType.OR), + isRhsOr = rhsExp instanceof OrExpression || + (rhsExp instanceof DNFExpression && ((DNFExpression) rhsExp).type == DNFExpression.DNFExpressionType.OR); + + if (isLhsOr || isRhsOr) current = new DNFExpression( + "dnfOr", dnfPlan, DNFExpression.DNFExpressionType.OR); + else current = new DNFExpression("dnfAnd", dnfPlan, + DNFExpression.DNFExpressionType.AND); + + ((DNFPlan) dnfPlan).safeAdd(current); + result.push(current); + if (!isLhsOr && !isRhsOr) { + if (isLhsAnd) addChildren(current, lhsExp); + else if (!isLhsOr) { + // lhs is a simple expression + ((DNFPlan) dnfPlan).safeAdd(lhsExp); + dnfPlan.connect(current, lhsExp); + } + if (isRhsAnd) addChildren(current, rhsExp); + else if (!isRhsOr) { + // rhs is a simple expression + ((DNFPlan) dnfPlan).safeAdd(rhsExp); + dnfPlan.connect(current, rhsExp); + } + } + else if (!isLhsOr) { + // rhs is OR + if (!isLhsAnd) { + // lhs is simple + mergeSimpleOr(current, lhsExp, rhsExp, true); + } + else { + // lhs is AND + mergeAndOr(current, lhsExp, rhsExp, true); + } + } + else if (!isRhsOr) { + // lhs is OR + if (!isRhsAnd) { + // rhs is simple + mergeSimpleOr(current, rhsExp, lhsExp, false); + } + else { + // rhs is AND + mergeAndOr(current, rhsExp, lhsExp, false); + } + } + else { + // both lhs and rhs are OR + Operator[] lhsChildren = lhsExp.getPlan().getSuccessors( + lhsExp).toArray(new Operator[0]); + Operator[] rhsChildren = rhsExp.getPlan().getSuccessors( + rhsExp).toArray(new Operator[0]); + boolean lhsDNF = lhsExp.getPlan() == dnfPlan, rhsDNF = rhsExp.getPlan() == dnfPlan; + int lsize = lhsChildren.length, rsize = rhsChildren.length; + LogicalExpression[][] grandChildrenL = new LogicalExpression[lsize][];; + for (int i = 0; i < lsize; i++) { + if (lhsChildren[i] instanceof AndExpression || lhsChildren[i] instanceof DNFExpression) grandChildrenL[i] = dnfPlan.getSuccessors( + lhsChildren[i]).toArray( + new LogicalExpression[0]); + else { + grandChildrenL[i] = new LogicalExpression[1]; + grandChildrenL[i][0] = (LogicalExpression) lhsChildren[i]; + } + } + LogicalExpression[][] grandChildrenR = new LogicalExpression[rsize][];; + for (int i = 0; i < rsize; i++) { + if (rhsChildren[i] instanceof AndExpression || rhsChildren[i] instanceof DNFExpression) grandChildrenR[i] = dnfPlan.getSuccessors( + rhsChildren[i]).toArray( + new LogicalExpression[0]); + else { + grandChildrenR[i] = new LogicalExpression[1]; + grandChildrenR[i][0] = (LogicalExpression) rhsChildren[i]; + } + } + if (lhsDNF) { + removeDescendants(dnfPlan, lhsExp); + dnfPlan.remove(lhsExp); + } + if (rhsDNF) { + removeDescendants(dnfPlan, rhsExp); + dnfPlan.remove(rhsExp); + } + for (int i = 0; i < lsize; i++) + for (LogicalExpression lgchild : grandChildrenL[i]) + if (lgchild instanceof LogicalExpressionProxy) + ((LogicalExpressionProxy) lgchild).restoreSrc(); + for (int i = 0; i < rsize; i++) + for (LogicalExpression rgchild : grandChildrenR[i]) + if (rgchild instanceof LogicalExpressionProxy) + ((LogicalExpressionProxy) rgchild).restoreSrc(); + for (int i = 0; i < lsize; i++) { + for (int j = 0; j < rsize; j++) { + LogicalExpression child = new DNFExpression( + "dnfAnd", dnfPlan, + DNFExpression.DNFExpressionType.AND); + ((DNFPlan) dnfPlan).safeAdd(child); + dnfPlan.connect(current, child); + for (LogicalExpression lgchild : grandChildrenL[i]) { + LogicalExpressionProxy lhsClone; + if (lgchild instanceof LogicalExpressionProxy) { + lhsClone = new LogicalExpressionProxy( + dnfPlan, + ((LogicalExpressionProxy) lgchild).src); + } + else { + lhsClone = new LogicalExpressionProxy( + dnfPlan, lgchild); + } + dnfPlan.add(lhsClone); + dnfPlan.connect(child, lhsClone); + } + for (LogicalExpression rgchild : grandChildrenR[j]) { + LogicalExpressionProxy rhsClone; + if (rgchild instanceof LogicalExpressionProxy) { + rhsClone = new LogicalExpressionProxy( + dnfPlan, + ((LogicalExpressionProxy) rgchild).src); + } + else { + rhsClone = new LogicalExpressionProxy( + dnfPlan, + rgchild); + } + dnfPlan.add(rhsClone); + dnfPlan.connect(child, rhsClone); + } + } + } + } + } + } + } + + private void removeDescendants(OperatorPlan plan, Operator op) + throws FrontendException { + // remove recursively a operator and it descendants from the plan + if (plan.getSuccessors(op) == null) return; + + Object[] children = plan.getSuccessors(op).toArray(); + if (children != null) { + for (Object c : children) { + Operator child = (Operator) c; + removeDescendants(plan, child); + plan.disconnect(op, child); + plan.remove(child); + } + } + } + + private void mergeSimpleOr(LogicalExpression current, + LogicalExpression simple, LogicalExpression or, + boolean simpleFirst) throws FrontendException { + Operator[] orChildren = or.getPlan().getSuccessors(or).toArray( + new Operator[0]); + int size = orChildren.length; + LogicalExpression[][] grandChildrenOr = new LogicalExpression[size][];; + for (int i = 0; i < size; i++) { + if (orChildren[i] instanceof AndExpression || orChildren[i] instanceof DNFExpression) grandChildrenOr[i] = dnfPlan.getSuccessors( + orChildren[i]).toArray( + new LogicalExpression[0]); + else { + grandChildrenOr[i] = new LogicalExpression[1]; + grandChildrenOr[i][0] = (LogicalExpression) orChildren[i]; + } + } + boolean simpleDNF = simple.getPlan() == dnfPlan, orDNF = or.getPlan() == dnfPlan; + if (simpleDNF) { + if (simple instanceof LogicalExpressionProxy) + ((LogicalExpressionProxy) simple).restoreSrc(); + dnfPlan.remove(simple); + } + if (orDNF) { + removeDescendants(dnfPlan, or); + dnfPlan.remove(or); + } + for (int i = 0; i < size; i++) { + LogicalExpression child = new DNFExpression("dnfAnd", + dnfPlan, DNFExpression.DNFExpressionType.AND); + ((DNFPlan) dnfPlan).safeAdd(child); + dnfPlan.connect(current, child); + LogicalExpressionProxy simpleClone; + if (simple instanceof LogicalExpressionProxy) simpleClone = new LogicalExpressionProxy( + dnfPlan, + ((LogicalExpressionProxy) simple).src); + else simpleClone = new LogicalExpressionProxy(dnfPlan, simple); + dnfPlan.add(simpleClone); + if (simpleFirst) dnfPlan.connect(child, simpleClone); + for (Operator gchild : grandChildrenOr[i]) { + LogicalExpression childClone; + if (gchild instanceof LogicalExpressionProxy) childClone = (LogicalExpression) gchild; + else childClone = new LogicalExpressionProxy(dnfPlan, + (LogicalExpression) gchild); + dnfPlan.add(childClone); + dnfPlan.connect(child, childClone); + } + if (!simpleFirst) dnfPlan.connect(child, simpleClone); + } + } + + private void mergeAndOr(LogicalExpression current, + LogicalExpression and, LogicalExpression or, + boolean andFirst) throws FrontendException { + Operator[] andChildren = and.getPlan().getSuccessors(and).toArray( + new Operator[0]); + Operator[] orChildren = or.getPlan().getSuccessors(or).toArray( + new Operator[0]); + int orSize = orChildren.length; + int andSize = andChildren.length; + boolean andDNF = and.getPlan() == dnfPlan, orDNF = or.getPlan() == dnfPlan; + LogicalExpression[][] grandChildrenOr = new LogicalExpression[orSize][];; + for (int i = 0; i < orSize; i++) { + if (orChildren[i] instanceof AndExpression || orChildren[i] instanceof DNFExpression) grandChildrenOr[i] = dnfPlan.getSuccessors( + orChildren[i]).toArray( + new LogicalExpression[0]); + else { + grandChildrenOr[i] = new LogicalExpression[1]; + grandChildrenOr[i][0] = (LogicalExpression) orChildren[i]; + } + } + for (Operator andChild : andChildren) { + if (andChild instanceof LogicalExpressionProxy) + ((LogicalExpressionProxy) andChild).restoreSrc(); + } + if (andDNF) { + removeDescendants(dnfPlan, and); + dnfPlan.remove(and); + } + if (orDNF) { + removeDescendants(dnfPlan, or); + dnfPlan.remove(or); + } + for (int i = 0; i < orSize; i++) { + LogicalExpression child = new DNFExpression("dnfAnd", + dnfPlan, DNFExpression.DNFExpressionType.AND); + ((DNFPlan) dnfPlan).safeAdd(child); + if (!andFirst) for (Operator gchild : grandChildrenOr[i]) { + dnfPlan.connect(child, gchild); + } + for (int j = 0; j < andSize; j++) { + LogicalExpressionProxy andChildClone; + if (andChildren[j] instanceof LogicalExpressionProxy) andChildClone = new LogicalExpressionProxy( + dnfPlan, + ((LogicalExpressionProxy) andChildren[j]).src); + else andChildClone = new LogicalExpressionProxy( + dnfPlan, + (LogicalExpression) andChildren[j]); + dnfPlan.connect(child, andChildClone); + } + if (andFirst) for (Operator gchild : grandChildrenOr[i]) { + dnfPlan.connect(child, gchild); + } + dnfPlan.connect(current, child); + } + } + + @Override + public void visit(OrExpression exp) throws FrontendException { + LogicalExpression rhsExp = result.pop(); + LogicalExpression lhsExp = result.pop(); + if (!(lhsExp instanceof OrExpression) && + (!(lhsExp instanceof DNFExpression) || + ((DNFExpression) lhsExp).type == DNFExpression.DNFExpressionType.AND) && !(rhsExp instanceof OrExpression) && (!(rhsExp instanceof DNFExpression) || ((DNFExpression) rhsExp).type == DNFExpression.DNFExpressionType.AND)) result.push(exp); + else { + if (dnfPlan == null) dnfPlan = new DNFPlan(); + LogicalExpression current = new DNFExpression("dnfOr", + dnfPlan, DNFExpression.DNFExpressionType.OR); + result.push(current); + ((DNFPlan) dnfPlan).safeAdd(current); + if (lhsExp instanceof OrExpression || (lhsExp instanceof DNFExpression && ((DNFExpression) lhsExp).type == DNFExpression.DNFExpressionType.OR)) + addChildren(current, lhsExp); + else + dnfPlan.connect(current, lhsExp); + if (rhsExp instanceof OrExpression || (rhsExp instanceof DNFExpression && ((DNFExpression) rhsExp).type == DNFExpression.DNFExpressionType.OR)) + addChildren(current, rhsExp); + else + dnfPlan.connect(current, rhsExp); + } + } + + private void addChildren(LogicalExpression current, + LogicalExpression exp) throws FrontendException { + OperatorPlan childPlan = exp.getPlan(); + Operator[] children = childPlan.getSuccessors(exp).toArray( + new Operator[0]); + int size = children.length; + for (int i = 0; i < size; ++i) { + ((DNFPlan) dnfPlan).safeAdd(children[i]); + dnfPlan.connect(current, children[i]); + } + } +} Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionProxy.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionProxy.java?rev=991695&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionProxy.java (added) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LogicalExpressionProxy.java Wed Sep 1 20:35:25 2010 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.newplan.logical.rules; + +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.logical.relational.LogicalSchema; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.OperatorPlan; +import org.apache.pig.newplan.PlanVisitor; +import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier.LogicalExpressionSimplifierTransformer; + +/** + * A proxy for logical expression. Useful if the original plan is not + * touched while a side plan with the nodes associated with the + * original is worked on. + * + * + */ +class LogicalExpressionProxy extends LogicalExpression { + LogicalExpression src; + + LogicalExpressionProxy(OperatorPlan plan, LogicalExpression src) { + super("Proxy: " + src.getName(), plan); + this.src = src; + LogicalExpressionSimplifierTransformer.incrDNFSplitCount(src); + } + + @Override + public void accept(PlanVisitor v) throws FrontendException { + throw new FrontendException("Visitor not accepted by proxy."); + } + + @Override + public boolean isEqual(Operator other) throws FrontendException { + if (other != null && other instanceof LogicalExpressionProxy) { + return src.isEqual(((LogicalExpressionProxy) other).src); + } + else { + return false; + } + } + + @Override + public LogicalSchema.LogicalFieldSchema getFieldSchema() + throws FrontendException { + return src.getFieldSchema(); + } + + public void decrSrcDNFSplitCounter() { + LogicalExpressionSimplifierTransformer.decrDNFSplitCount(src); + if (LogicalExpressionSimplifierTransformer.getSplitCount(src) == 1) + LogicalExpressionSimplifierTransformer.decrDNFSplitCount(src); + } + + public void restoreSrc() { + while (LogicalExpressionSimplifierTransformer.getSplitCount(src) > 1) + LogicalExpressionSimplifierTransformer.decrDNFSplitCount(src); + } + + @Override + public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException { + throw new FrontendException("Deepcopy not expected"); + } +}