Author: rding Date: Wed Feb 17 23:16:58 2010 New Revision: 911219 URL: http://svn.apache.org/viewvc?rev=911219&view=rev Log: PIG-1169: Top-N queries produce incorrect results when a store statement is added between order by and limit statements
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=911219&r1=911218&r2=911219&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Feb 17 23:16:58 2010 @@ -95,6 +95,8 @@ BUG FIXES +PIG-1169: Top-N queries produce incorrect results when a store statement is added between order by and limit statement (rding) + PIG-1131: Pig simple join does not work when it contains empty lines (ashutoshc) PIG-834: incorrect plan when algebraic functions are nested (ashutoshc) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=911219&r1=911218&r2=911219&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Wed Feb 17 23:16:58 2010 @@ -40,7 +40,11 @@ import org.apache.pig.impl.logicalLayer.LOJoin; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.LogicalPlanCloner; import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.optimizer.OptimizerException; /** @@ -138,7 +142,7 @@ // Limit cannot be pushed up if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter || predecessor instanceof LOLoad || predecessor instanceof LOSplit || - predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOJoin) + predecessor instanceof LODistinct || predecessor instanceof LOJoin) { return; } @@ -234,6 +238,48 @@ throw new OptimizerException(msg, errCode, PigException.BUG, e); } } + // Limit and OrderBy (LOSort) can be separated by split + else if (predecessor instanceof LOSplitOutput) { + if(mode == ExecType.LOCAL) { + //We don't need this optimisation to happen in the local mode. + //so we do nothing here. + } else { + List<LogicalOperator> grandparants = mPlan + .getPredecessors(predecessor); + // After insertion of splitters, any node in the plan can + // have at most one predecessor + if (grandparants != null && grandparants.size() != 0 + && grandparants.get(0) instanceof LOSplit) { + List<LogicalOperator> greatGrandparants = mPlan + .getPredecessors(grandparants.get(0)); + if (greatGrandparants != null + && greatGrandparants.size() != 0 + && greatGrandparants.get(0) instanceof LOSort) { + LOSort sort = (LOSort)greatGrandparants.get(0); + LOSort newSort = new LOSort( + sort.getPlan(), + new OperatorKey( + sort.getOperatorKey().scope, + NodeIdGenerator + .getGenerator() + .getNextNodeId( + sort.getOperatorKey().scope)), + sort.getSortColPlans(), + sort.getAscendingCols(), + sort.getUserFunc()); + + newSort.setLimit(limit.getLimit()); + try { + mPlan.replace(limit, newSort); + } catch (PlanException e) { + int errCode = 2012; + String msg = "Can not replace LOLimit with LOSort after splitter"; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + } + } + } + } else { int errCode = 2013; String msg = "Moving LOLimit in front of " + predecessor.getClass().getSimpleName() + " is not implemented"; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=911219&r1=911218&r2=911219&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed Feb 17 23:16:58 2010 @@ -88,6 +88,66 @@ myPig = null; } + public void testMultiQueryJiraPig1169() { + + // test case: Problems with some top N queries + + String INPUT_FILE = "abc"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("1\t2\t3"); + w.println("2\t3\t4"); + w.println("3\t4\t5"); + w.println("5\t6\t7"); + w.println("6\t7\t8"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("A = load '" + INPUT_FILE + + "' as (a:int, b, c);"); + myPig.registerQuery("A1 = Order A by a desc parallel 3;"); + myPig.registerQuery("A2 = limit A1 2;"); + myPig.registerQuery("store A1 into '/tmp/input1';"); + myPig.registerQuery("store A2 into '/tmp/input2';"); + + myPig.executeBatch(); + + myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);"); + + Iterator<Tuple> iter = myPig.openIterator("B"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(6,7,8)", + "(5,6,7)" + }); + + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + + assertEquals(expectedResults.size(), counter); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + public void testMultiQueryJiraPig1171() { // test case: Problems with some top N queries