Author: daijy
Date: Fri Jun 19 18:02:44 2009
New Revision: 786607

URL: http://svn.apache.org/viewvc?rev=786607&view=rev
Log:
PIG-797: Limit with ORDER BY producing wrong results

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun 19 18:02:44 2009
@@ -84,6 +84,8 @@
 
 BUG FIXES
 
+PIG-797: Limit with ORDER BY producing wrong results (daijy)
+
 PIG-850: Dump produce wrong result while "store into" is ok (daijy)
 
 PIG-846: MultiQuery optimization in some cases has an issue when there is a

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Jun 19 18:02:44 2009
@@ -712,9 +712,38 @@
         }
     }
     
+    public void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper 
sortMROp) throws PlanException, VisitorException
+    {
+        POLocalRearrange slr = 
(POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
+        
+        POLocalRearrange lr = null;
+        try {
+            lr = slr.clone();
+        } catch (CloneNotSupportedException e) {
+            int errCode = 2147;
+            String msg = "Error cloning POLocalRearrange for limit after sort";
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        
+        mro.mapPlan.addAsLeaf(lr);
+        
+        POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
+
+        POPackage pkg = null;
+        try {
+            pkg = spkg.clone();
+        } catch (Exception e) {
+            int errCode = 2148;
+            String msg = "Error cloning POPackageLite for limit after sort";
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        mro.reducePlan.add(pkg);
+        mro.reducePlan.addAsLeaf(getPlainForEachOP());
+    }
+    
     public void simpleConnectMapToReduce(MapReduceOper mro) throws 
PlanException
     {
-       PhysicalPlan ep = new PhysicalPlan();
+        PhysicalPlan ep = new PhysicalPlan();
         POProject prjStar = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar.setResultType(DataType.TUPLE);
         prjStar.setStar(true);
@@ -727,8 +756,8 @@
         try {
             lr.setIndex(0);
         } catch (ExecException e) {
-               int errCode = 2058;
-               String msg = "Unable to set index on the newly created 
POLocalRearrange.";
+            int errCode = 2058;
+            String msg = "Unable to set index on the newly created 
POLocalRearrange.";
             throw new PlanException(msg, errCode, PigException.BUG, e);
         }
         lr.setKeyType(DataType.TUPLE);
@@ -744,6 +773,11 @@
         pkg.setInner(inner);
         mro.reducePlan.add(pkg);
         
+        mro.reducePlan.addAsLeaf(getPlainForEachOP());
+    }
+    
+    public POForEach getPlainForEachOP()
+    {
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();
         PhysicalPlan ep1 = new PhysicalPlan();
@@ -755,11 +789,10 @@
         ep1.add(prj1);
         eps1.add(ep1);
         flat1.add(true);
-        POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
+        POForEach fe = new POForEach(new OperatorKey(scope, nig
                 .getNextNodeId(scope)), -1, eps1, flat1);
-        nfe1.setResultType(DataType.BAG);
-        
-        mro.reducePlan.addAsLeaf(nfe1);
+        fe.setResultType(DataType.BAG);
+        return fe;
     }
     
     public void visitLimit(POLimit op) throws VisitorException{
@@ -1719,7 +1752,10 @@
                 POLimit pLimit = new POLimit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit.setLimit(mr.limit);
                 limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
-                simpleConnectMapToReduce(limitAdjustMROp);
+                if (mr.isGlobalSort()) 
+                    connectMapToReduceLimitedSort(limitAdjustMROp, mr);
+                else
+                    simpleConnectMapToReduce(limitAdjustMROp);
                 POLimit pLimit2 = new POLimit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit2.setLimit(mr.limit);
                 limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Fri Jun 19 18:02:44 2009
@@ -303,17 +303,13 @@
     }
 
     /**
-     * Make a deep copy of this operator.  This is declared here to make it
-     * public for all physical operators.  However, the default
-     * implementation is to throw an exception.  Operators we expect to clone
-     * need to implement this method.
+     * Make a deep copy of this operator. This function is blank, however, 
+     * we should leave a place holder so that the subclasses can clone
      * @throws CloneNotSupportedException
      */
     @Override
     public PhysicalOperator clone() throws CloneNotSupportedException {
-        String s = new String("This physical operator does not " +
-            "implement clone.");
-        throw new CloneNotSupportedException(s);
+        return (PhysicalOperator)super.clone();
     }
 
     protected void cloneHelper(PhysicalOperator op) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Fri Jun 19 18:02:44 2009
@@ -321,15 +321,21 @@
      */
     @Override
     public POPackage clone() throws CloneNotSupportedException {
-        POPackage clone = new POPackage(new OperatorKey(mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
+        POPackage clone = (POPackage)super.clone();
+        clone.mKey = new OperatorKey(mKey.scope, 
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
+        clone.requestedParallelism = requestedParallelism;
         clone.resultType = resultType;
         clone.keyType = keyType;
         clone.numInputs = numInputs;
-        clone.inner = new boolean[inner.length];
-        for (int i = 0; i < inner.length; i++) {
-            clone.inner[i] = inner[i];
+        if (inner!=null)
+        {
+            clone.inner = new boolean[inner.length];
+            for (int i = 0; i < inner.length; i++) {
+                clone.inner[i] = inner[i];
+            }
         }
+        else
+            clone.inner = null;
         return clone;
     }
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Jun 19 
18:02:44 2009
@@ -347,4 +347,36 @@
         assertEquals(10, numIdentity);
     }
 
+    @Test
+    public void testLimitAfterSort() throws Exception{
+        int LOOP_COUNT = 40;
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        Random r = new Random(1);
+        int rand;
+        for(int i = 0; i < LOOP_COUNT; i++) {
+            rand = r.nextInt(100);
+            ps.println(rand);
+        }
+        ps.close();
+
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "' AS (num:int);");
+        pigServer.registerQuery("B = order A by num parallel 2;");
+        pigServer.registerQuery("C = limit B 10;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        if(!iter.hasNext()) fail("No output found");
+        int numIdentity = 0;
+        int oldNum = Integer.MIN_VALUE;
+        int newNum;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            newNum = (Integer)t.get(0);
+            assertTrue(newNum>=oldNum);
+            oldNum = newNum;
+            ++numIdentity;
+        }
+        assertEquals(10, numIdentity);
+    }
+
+
 }


Reply via email to