Author: gates
Date: Tue May  5 15:49:58 2009
New Revision: 771844

URL: http://svn.apache.org/viewvc?rev=771844&view=rev
Log:
PIG-741 Allow limit to be nested in foreach (gates).


Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue May  5 15:49:58 2009
@@ -34,6 +34,8 @@
 PIG-775: PORelationToExprProject should create a NonSpillableDataBag to create
 empty bags (pradeepkth)
 
+PIG-741: Allow limit to be nested in a foreach.
+
 PIG-743: To implement clover (gkesavan)
 
 PIG-701: Implement IVY for resolving pig dependencies (gkesavan)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue May  5 15:49:58 2009
@@ -185,7 +185,7 @@
      * Starts batch execution mode.
      */
     public void setBatchOn() {
-        log.info("Create a new graph.");
+        log.debug("Create a new graph.");
         
         if (currDAG != null) {
             graphs.push(currDAG);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Tue May  5 15:49:58 2009
@@ -38,6 +38,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
@@ -762,6 +763,10 @@
             sawNonAlgebraic = true;
         }
         
+        public void visitLimit(POLimit limit) throws VisitorException {
+            sawNonAlgebraic = true;
+        }
+
         private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, 
PhysicalOperator opToCheck) {
             List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);
             if(succs.size() == 1) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 Tue May  5 15:49:58 2009
@@ -94,9 +94,7 @@
         res.result = it.next();
         if (res.result == null){
             res.returnStatus = POStatus.STATUS_EOP;
-            inputsAccumulated = false;
-            distinctBag = null;
-            it = null;
+            reset();
         } else {
             res.returnStatus = POStatus.STATUS_OK;
         }
@@ -119,6 +117,13 @@
     }
 
     @Override
+    public void reset() {
+        inputsAccumulated = false;
+        distinctBag = null;
+        it = null;
+    }
+
+    @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitDistinct(this);
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Tue May  5 15:49:58 2009
@@ -201,7 +201,6 @@
             for (PhysicalOperator po : opsToBeReset) {
                 po.reset();
             }
-
             res = processPlan();
             
             processingPlan = true;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 Tue May  5 15:49:58 2009
@@ -29,6 +29,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -115,4 +116,18 @@
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitLimit(this);
     }
+
+    @Override
+    public void reset() {
+        soFar = 0;
+    }
+
+    @Override
+    public POLimit clone() throws CloneNotSupportedException {
+        POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope,
+            NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
+            this.requestedParallelism, this.inputs);
+        newLimit.mLimit = this.mLimit;
+        return newLimit;
+    }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 Tue May  5 15:49:58 2009
@@ -282,9 +282,7 @@
             res.returnStatus = POStatus.STATUS_OK;
         } else {
             res.returnStatus = POStatus.STATUS_EOP;
-            inputsAccumulated = false;
-            sortedBag = null;
-            it = null;
+            reset();
         }
                return res;
        }
@@ -307,6 +305,13 @@
                v.visitSort(this);
        }
 
+    @Override
+    public void reset() {
+        inputsAccumulated = false;
+        sortedBag = null;
+        it = null;
+    }
+
     public List<PhysicalPlan> getSortPlans() {
         return sortPlans;
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Tue May  5 15:49:58 2009
@@ -1278,7 +1278,7 @@
 }
 {
         (
-        input = NestedExpr(lp) {log.debug("Filter input: " + input);}
+        input = NestedExpr(lp) {log.debug("Limit input: " + input);}
          t = <INTEGER>
          )
         {
@@ -2184,6 +2184,7 @@
 |      item = NestedFilter(over,specs,lp, input)        
 |      item = NestedSortOrArrange(over,specs,lp, input)
 |      item = NestedDistinct(over,specs,lp, input)     
+|      item = NestedLimit(over,specs,lp, input)        
        )
        )       
        {
@@ -2411,7 +2412,40 @@
        }
 }
        
-       
+LogicalOperator NestedLimit(Schema over, Map<String, LogicalOperator> specs, 
LogicalPlan lp, LogicalOperator input):
+{
+    LogicalOperator eOp;
+    Schema subSchema = null; 
+    Token t;
+    log.trace("Entering LimitClause");
+}
+{
+    (
+    <LIMIT>
+    (
+    LOOKAHEAD(NestedProject(over, specs, lp, input)) eOp = NestedProject(over, 
specs, lp, input)
+|   LOOKAHEAD({ null != specs.get(getToken(1).image) }) t = <IDENTIFIER> {eOp 
= specs.get(t.image);}
+|   eOp = BaseEvalSpec(over, specs, lp, input)
+    )
+    {subSchema = eOp.getSchema();}
+    t = <INTEGER>
+    )
+    {
+        lp.add(eOp);
+        log.debug("Added " + eOp.getAlias() + " to the logical plan");
+        long l = Integer.parseInt(t.image);
+        LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, 
getNextId()), l);
+        lp.add(limit);
+        log.debug("Added operator " + limit.getClass().getName() + " to the 
logical plan");
+
+        lp.connect(eOp, limit);
+        log.debug("Connected the limit input to the limit");
+
+        log.trace("Exiting NestedLimit");
+        return limit;
+    }
+}
+
 LogicalOperator GenerateStatement(Schema over, Map<String, LogicalOperator> 
specs, LogicalPlan lp, LogicalOperator input):
 {
        LogicalOperator spec = null; 

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Tue May  5 15:49:58 2009
@@ -27,8 +27,7 @@
 
 import java.util.Iterator;
 import java.util.Random;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.text.DecimalFormat;
 
 public class TestForEachNestedPlanLocal extends TestCase {
@@ -65,6 +64,30 @@
         }
     }
 
+    @Test
+    public void testInnerLimit() throws Exception {
+        File tmpFile = genDataSetFileOneGroup();
+        pig.registerQuery("a = load '" + Util.generateURI(tmpFile.toString()) 
+ "'; ");
+        pig.registerQuery("b = group a by $0; ");
+        pig.registerQuery("c = foreach b { " + "     c1 = limit $1 5; "
+                + "    generate COUNT(c1); " + "};");
+        Iterator<Tuple> it = pig.openIterator("c");
+        Tuple t = null;
+        long count[] = new long[3];
+        for (int i = 0; i < 3 && it.hasNext(); i++) {
+            t = it.next();
+            count[i] = (Long)t.get(0);
+        }
+
+        Assert.assertFalse(it.hasNext());
+
+        Assert.assertEquals(3L, count[0]);
+        Assert.assertEquals(5L, count[1]);
+        Assert.assertEquals(5L, count[2]);
+    }
+
+
+
 
     /*
     @Test
@@ -113,4 +136,34 @@
 
         return TestHelper.createTempFile(data) ;
     }
+
+    private File genDataSetFileOneGroup() throws IOException {
+
+        File fp1 = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+
+        ps.println("lost\tjack");
+        ps.println("lost\tkate");
+        ps.println("lost\tsawyer");
+        ps.println("lost\tdesmond");
+        ps.println("lost\thurley");
+        ps.println("lost\tlocke");
+        ps.println("lost\tsun");
+        ps.println("lost\tcharlie");
+        ps.println("lost\tjin");
+        ps.println("lost\tben");
+        ps.println("lotr\tfrodo");
+        ps.println("lotr\tsam");
+        ps.println("lotr\tmerry");
+        ps.println("lotr\tpippen");
+        ps.println("lotr\tbilbo");
+        ps.println("3stooges\tlarry");
+        ps.println("3stooges\tmoe");
+        ps.println("3stooges\tcurly");
+
+        ps.close();
+
+        return fp1;
+    }
+
 }


Reply via email to