Author: rding Date: Thu Mar 4 18:21:21 2010 New Revision: 919109 URL: http://svn.apache.org/viewvc?rev=919109&view=rev Log: PIG-1267: Problems with partition filter optimizer
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=919109&r1=919108&r2=919109&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Mar 4 18:21:21 2010 @@ -139,6 +139,8 @@ BUG FIXES +PIG-1267: Problems with partition filter optimizer (rding) + PIG-1079: Modify merge join to use distributed cache to maintain the index (rding) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=919109&r1=919108&r2=919109&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Thu Mar 4 18:21:21 2010 @@ -19,10 +19,11 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.Expression; @@ -31,7 +32,6 @@ import org.apache.pig.PigException; import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.Expression.Column; -import org.apache.pig.Expression.OpType; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.LOFilter; import org.apache.pig.impl.logicalLayer.LOLoad; @@ -69,9 +69,9 @@ private LOFilter loFilter; /** - * flag to ensure we only do the optimization once for performance reasons + * to ensure we only do the optimization once for performance reasons */ - private boolean alreadyCalled = false; + private Set<LogicalOperator> alreadyChecked = new HashSet<LogicalOperator>(); /** * a map between column names as reported in @@ -98,13 +98,6 @@ @Override public boolean check(List<LogicalOperator> nodes) throws OptimizerException { - if(!alreadyCalled) { - // first call - alreadyCalled = true; - } else { - // already called, just return - return false; - } if((nodes == null) || (nodes.size() <= 0)) { int errCode = 2052; String msg = "Internal error. Cannot retrieve operator from null " + @@ -114,6 +107,9 @@ if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad)) { return false; } + if (!alreadyChecked.add(nodes.get(0))) { + return false; + } loLoad = (LOLoad)nodes.get(0); List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad); if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) { @@ -164,7 +160,7 @@ updateMappedColNames(partitionFilter); loadMetadata.setPartitionFilter(partitionFilter); if(pColFilterFinder.isFilterRemovable()) { - // remove this filter from the plan + // remove this filter from the plan mPlan.removeAndReconnect(loFilter); } } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=919109&r1=919108&r2=919109&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Mar 4 18:21:21 2010 @@ -669,7 +669,10 @@ pred = preds.get(0); disconnect(pred, node); } - + + int oldPos = -1; + int newPos = -1; + List<E> succs = getSuccessors(node); E succ = null; if (succs != null) { @@ -681,13 +684,51 @@ throw pe; } succ = succs.get(0); + List<E> plst = getPredecessors(succ); + for (int i=0; i<plst.size(); i++) { + if (plst.get(i).equals(node)) { + oldPos = i; + } + } disconnect(node, succ); } remove(node); if (pred != null && succ != null) { connect(pred, succ); - succ.rewire(node, 0, pred, true); + List<E> plst = getPredecessors(succ); + for (int i=0; i<plst.size(); i++) { + if (plst.get(i).equals(pred)) { + newPos = i; + } + } + + if (oldPos < 0 || newPos < 0) { + throw new PlanException("Invalid position index: " + oldPos + + " : " + newPos); + } + + if (oldPos != newPos) { + List<E> nlst = new ArrayList<E>(); + for (int i=0; i<plst.size(); i++) { + E nod = plst.get(i); + if (i == oldPos) { + nlst.add(pred); + } + if (i == newPos) continue; + nlst.add(nod); + } + + if (nlst.size() != plst.size()) { + throw new PlanException("Invalid list size: " + nlst.size() + + " : " + plst.size()); + } + + mToEdges.removeKey(succ); + mToEdges.put(succ, nlst); + } + + succ.rewire(node, oldPos, pred, true); } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=919109&r1=919108&r2=919109&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Thu Mar 4 18:21:21 2010 @@ -45,6 +45,7 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.PColFilterExtractor; +import org.apache.pig.impl.logicalLayer.PlanSetter; import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.LogUtils; @@ -420,6 +421,46 @@ "((age >= 20) and (f3 == 15))", actual); } + /** + * Test PIG-1267 + * @throws Exception + */ + @Test + public void testColNameMapping5() throws Exception { + TestLoader.partFilter = null; + lpTester.buildPlan("a = load 'foo' using " + + TestLoader.class.getName() + + "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " + + "'srcid');"); + lpTester.buildPlan("b = load 'bar' using " + + TestLoader.class.getName() + + "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," + + "'srcid');"); + lpTester.buildPlan("a1 = filter a by srcid == 10;"); + lpTester.buildPlan("b1 = filter b by srcid == 20;"); + lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;"); + LogicalPlan lp = lpTester + .buildPlan("d = foreach c generate $4 as bcookie:chararray, " + + "$5 as dstid:int, $0 as mrkt:chararray;"); + + new PlanSetter(lp).visit(); + + lpTester.typeCheckPlan(lp); + lpTester.optimizePlan(lp); + + assertEquals("checking partition filter:", + "(srcid == 20)", + TestLoader.partFilter.toString()); + + int counter = 0; + Iterator<LogicalOperator> iter = lp.getKeys().values().iterator(); + while (iter.hasNext()) { + assertTrue(!(iter.next() instanceof LOFilter)); + counter++; + } + assertEquals(counter, 6); + } + //// helper methods /////// private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols,