Author: rding
Date: Wed Feb 17 23:16:58 2010
New Revision: 911219

URL: http://svn.apache.org/viewvc?rev=911219&view=rev
Log:
PIG-1169: Top-N queries produce incorrect results when a store statement is 
added between order by and limit statements

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 17 23:16:58 2010
@@ -95,6 +95,8 @@
 
 BUG FIXES
 
+PIG-1169: Top-N queries produce incorrect results when a store statement is 
added between order by and limit statement (rding)
+
 PIG-1131: Pig simple join does not work when it contains empty lines 
(ashutoshc)
 
 PIG-834: incorrect plan when algebraic functions are nested (ashutoshc)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
 Wed Feb 17 23:16:58 2010
@@ -40,7 +40,11 @@
 import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanCloner;
 import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 /**
@@ -138,7 +142,7 @@
             // Limit cannot be pushed up
             if (predecessor instanceof LOCogroup || predecessor instanceof 
LOFilter ||
                        predecessor instanceof LOLoad || predecessor instanceof 
LOSplit ||
-                       predecessor instanceof LOSplitOutput || predecessor 
instanceof LODistinct || predecessor instanceof LOJoin)
+                       predecessor instanceof LODistinct || predecessor 
instanceof LOJoin)
             {
                return;
             }
@@ -234,6 +238,48 @@
                        throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
                }
             }
+            // Limit and OrderBy (LOSort) can be separated by split
+            else if (predecessor instanceof LOSplitOutput) {               
+                if(mode == ExecType.LOCAL) {
+                    //We don't need this optimisation to happen in the local 
mode.
+                    //so we do nothing here.
+                } else {
+                    List<LogicalOperator> grandparants = mPlan
+                            .getPredecessors(predecessor);
+                    // After insertion of splitters, any node in the plan can 
+                    // have at most one predecessor
+                    if (grandparants != null && grandparants.size() != 0
+                            && grandparants.get(0) instanceof LOSplit) {       
                 
+                        List<LogicalOperator> greatGrandparants = mPlan
+                                .getPredecessors(grandparants.get(0));
+                        if (greatGrandparants != null
+                                && greatGrandparants.size() != 0
+                                && greatGrandparants.get(0) instanceof LOSort) 
{                           
+                            LOSort sort = (LOSort)greatGrandparants.get(0);
+                            LOSort newSort = new LOSort(
+                                    sort.getPlan(),
+                                    new OperatorKey(
+                                            sort.getOperatorKey().scope,
+                                            NodeIdGenerator
+                                                    .getGenerator()
+                                                    .getNextNodeId(
+                                                            
sort.getOperatorKey().scope)),
+                                    sort.getSortColPlans(), 
+                                    sort.getAscendingCols(), 
+                                    sort.getUserFunc()); 
+                                                  
+                            newSort.setLimit(limit.getLimit());
+                            try {
+                                mPlan.replace(limit, newSort);
+                            } catch (PlanException e) {
+                                int errCode = 2012;
+                                String msg = "Can not replace LOLimit with 
LOSort after splitter";
+                                throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                            }
+                        }
+                    }
+                }
+            }
             else {
                 int errCode = 2013;
                 String msg = "Moving LOLimit in front of " + 
predecessor.getClass().getSimpleName() + " is not implemented";

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed Feb 17 
23:16:58 2010
@@ -88,6 +88,66 @@
         myPig = null;
     }
 
+    public void testMultiQueryJiraPig1169() {
+
+        // test case: Problems with some top N queries
+        
+        String INPUT_FILE = "abc";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\t2\t3");
+            w.println("2\t3\t4");
+            w.println("3\t4\t5");
+            w.println("5\t6\t7");
+            w.println("6\t7\t8");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE 
+                    + "' as (a:int, b, c);");
+            myPig.registerQuery("A1 = Order A by a desc parallel 3;");
+            myPig.registerQuery("A2 = limit A1 2;");
+            myPig.registerQuery("store A1 into '/tmp/input1';");
+            myPig.registerQuery("store A2 into '/tmp/input2';");
+
+            myPig.executeBatch();
+
+            myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);");
+            
+            Iterator<Tuple> iter = myPig.openIterator("B");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(6,7,8)",
+                            "(5,6,7)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());      
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+  
     public void testMultiQueryJiraPig1171() {
 
         // test case: Problems with some top N queries


Reply via email to