Author: olga
Date: Fri Sep 26 14:20:31 2008
New Revision: 699504

URL: http://svn.apache.org/viewvc?rev=699504&view=rev
Log:
PIG-443: illustrate

Added:
    
incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
    
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj

Modified: incubator/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 26 14:20:31 2008
@@ -253,3 +253,5 @@
 
     PIG-462: LIMIT N should create one output file with N rows (shravanmn via
     olgan)
+
+    PIG-443:  Illustrate for the Types branch (shubham via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 26 
14:20:31 2008
@@ -41,6 +41,7 @@
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -65,6 +66,7 @@
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.pen.ExampleGenerator;
 
 /**
  * 
@@ -610,6 +612,21 @@
         // pigContext.getExecutionEngine().reclaimScope(this.scope);
     }
 
+    public Map<LogicalOperator, DataBag> getExamples(String alias) {
+        //LogicalPlan plan = aliases.get(aliasOp.get(alias));
+        LogicalPlan plan = null;
+        try {
+            plan = clonePlan(alias);
+        } catch (IOException e) {
+            //Since the original script is parsed anyway, there should not be 
an
+            //error in this parsing. The only reason there can be an error is 
when
+            //the files being loaded in load don't exist anymore.
+            e.printStackTrace();
+        }
+        ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
+        return exgen.getExamples();
+    }
+    
     private ExecJob execute(String alias) throws FrontendException, 
ExecException {
         ExecJob job = null;
 //        lp.explain(System.out, System.err);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Fri Sep 26 14:20:31 2008
@@ -59,15 +59,15 @@
 
 public class LogToPhyTranslationVisitor extends LOVisitor {
 
-    Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
+    protected Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
 
     Random r = new Random();
 
-    Stack<PhysicalPlan> currentPlans;
+    protected Stack<PhysicalPlan> currentPlans;
 
-    PhysicalPlan currentPlan;
+    protected PhysicalPlan currentPlan;
 
-    NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+    protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
 
     private Log log = LogFactory.getLog(getClass());
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Fri Sep 26 14:20:31 2008
@@ -34,6 +34,7 @@
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * 
@@ -114,6 +115,8 @@
     static protected DataBag dummyBag;
 
     static protected Map dummyMap;
+    
+    protected LineageTracer lineageTracer;
 
     public PhysicalOperator(OperatorKey k) {
         this(k, -1, null);
@@ -134,6 +137,10 @@
         res = new Result();
     }
 
+    public void setLineageTracer(LineageTracer lineage) {
+       this.lineageTracer = lineage;
+    }
+    
     public int getRequestedParallelism() {
         return requestedParallelism;
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Fri Sep 26 14:20:31 2008
@@ -22,6 +22,8 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.*;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -219,4 +221,15 @@
         
     }
 
+    public void visitCogroup(POCogroup cogroup) {
+       // TODO Auto-generated method stub
+       
+    }
+
+    public void 
visitSplit(org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit
 split) {
+       // TODO Auto-generated method stub
+       
+    }
+
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 Fri Sep 26 14:20:31 2008
@@ -30,6 +30,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
 
 /**
  * This is an implementation of the Filter operator. It has an Expression Plan
@@ -150,6 +151,11 @@
                 return res;
 
             if (res.result != null && (Boolean) res.result == true) {
+               if(lineageTracer != null) {
+                   ExampleTuple tIn = (ExampleTuple) inp.result;
+                   lineageTracer.insert(tIn);
+                   lineageTracer.union(tIn, tIn);
+               }
                 return inp;
             }
         }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Fri Sep 26 14:20:31 2008
@@ -23,6 +23,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
 
 public class POForEach extends PhysicalOperator {
 
@@ -49,6 +50,8 @@
     //This is the template whcih contains tuples and is flattened out in 
CreateTuple() to generate the final output
     Object[] data = null;
     
+    ExampleTuple tIn = null;
+    
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -122,10 +125,17 @@
         if(processingPlan){
             while(true) {
                 res = processPlan();
-                if(res.returnStatus==POStatus.STATUS_OK){
+                if(res.returnStatus==POStatus.STATUS_OK) {
+                    if(lineageTracer !=  null && res.result != null) {
+                       ExampleTuple tOut = new ExampleTuple((Tuple) 
res.result);
+                       tOut.synthetic = tIn.synthetic;
+                       lineageTracer.insert(tOut);
+                       lineageTracer.union(tOut, tIn);
+                       res.result = tOut;
+                    }
                     return res;
                 }
-                if(res.returnStatus==POStatus.STATUS_EOP){
+                if(res.returnStatus==POStatus.STATUS_EOP) {
                     processingPlan = false;
                     break;
                 }
@@ -156,6 +166,16 @@
             res = processPlan();
             
             processingPlan = true;
+
+            if(lineageTracer != null && res.result != null) {
+               //we check for res.result since that can also be null in the 
case of flatten
+               tIn = (ExampleTuple) inp.result;
+               ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+               tOut.synthetic = tIn.synthetic;
+               lineageTracer.insert(tOut);
+               lineageTracer.union(tOut, tIn);
+               res.result = tOut;
+            }
             
             return res;
         }
@@ -323,6 +343,11 @@
             } else
                 out.append(in);
         }
+        
+        if(lineageTracer != null) {
+            ExampleTuple tOut = new ExampleTuple();
+            tOut.reference(out);
+        }
         return out;
     }
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 Fri Sep 26 14:20:31 2008
@@ -36,6 +36,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
 
 /**
  * The load operator which is used in two ways:
@@ -135,6 +136,10 @@
             }
             else
                 res.returnStatus = POStatus.STATUS_OK;
+            if(lineageTracer != null) {
+               ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+               res.result = tOut;
+            }
         } catch (IOException e) {
             log.error("Received error from loader function: " + e);
             return res;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 Fri Sep 26 14:20:31 2008
@@ -272,6 +272,10 @@
         }
         if (it.hasNext()) {
             res.result = it.next();
+            if(lineageTracer != null) {
+                lineageTracer.insert((Tuple) res.result);
+                lineageTracer.union((Tuple)res.result, (Tuple)res.result);
+            }
             res.returnStatus = POStatus.STATUS_OK;
         } else {
             res.returnStatus = POStatus.STATUS_EOP;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
 Fri Sep 26 14:20:31 2008
@@ -29,6 +29,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
 
 /**
  * The union operator that combines the two inputs into a single
@@ -157,6 +158,11 @@
             res.returnStatus = POStatus.STATUS_OK;
             detachInput();
             nextReturnEOP = true ;
+            if(lineageTracer != null) {
+               ExampleTuple tOut = (ExampleTuple) res.result;
+               lineageTracer.insert(tOut);
+               lineageTracer.union(tOut, tOut);
+            }
             return res;
         }
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 Fri Sep 26 14:20:31 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -55,4 +56,23 @@
 
         lpp.print(out);
     }
+    
+//    public String toString() {
+//        if(mOps.size() == 0)
+//            return "Empty Plan!";
+//        else{
+//            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+//            PrintStream ps = new PrintStream(baos);
+//            try {
+//             explain(baos, ps);
+//         } catch (VisitorException e) {
+//             // TODO Auto-generated catch block
+//             e.printStackTrace();
+//         } catch (IOException e) {
+//             // TODO Auto-generated catch block
+//             e.printStackTrace();
+//         }
+//            return baos.toString();
+//        }
+//    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 Fri Sep 26 14:20:31 2008
@@ -73,7 +73,7 @@
         return new DependencyOrderWalker<O, P>(plan);
     }
 
-    private void doAllPredecessors(O node,
+    protected void doAllPredecessors(O node,
                                    Set<O> seen,
                                    Collection<O> fifo) throws VisitorException 
{
         if (!seen.contains(node)) {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java 
Fri Sep 26 14:20:31 2008
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.util;
 
 import java.util.*;
+import java.util.Map.Entry;
 
 public class IdentityHashSet<E> implements Set<E> {
     
@@ -121,6 +122,23 @@
         throw new UnsupportedOperationException("Unsupported operation on 
IdentityHashSet.");
     }
 
-    
+    public String toString() {
+       StringBuffer buf = new StringBuffer();
+       buf.append("{");
+
+       Iterator<Entry<E, Object>> i = map.entrySet().iterator();
+       boolean hasNext = i.hasNext();
+       while (hasNext) {
+           Entry<E, Object> e = i.next();
+           E key = e.getKey();
+           buf.append(key);
+           hasNext = i.hasNext();
+           if (hasNext)
+               buf.append(", ");
+       }
+
+       buf.append("}");
+       return buf.toString();
+    }
     
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java 
Fri Sep 26 14:20:31 2008
@@ -273,6 +273,11 @@
             System.out.println(t);
         }
     }
+    
+    protected void processIllustrate(String alias) throws IOException
+    {
+       mPigServer.getExamples(alias);
+    }
 
     protected void processKill(String jobid) throws IOException
     {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 Fri Sep 26 14:20:31 2008
@@ -95,6 +95,8 @@
        abstract protected void processPig(String cmd) throws IOException;
 
        abstract protected void processRemove(String path) throws IOException;
+       
+       abstract protected void processIllustrate(String alias) throws 
IOException;
 
         static String unquote(String s)
        {
@@ -137,6 +139,7 @@
 TOKEN: {<REGISTER: "register">}
 TOKEN: {<REMOVE: "rm">}
 TOKEN: {<SET: "set">}
+TOKEN: {<ILLUSTRATE: "illustrate">}
 
 // internal use commands
 TOKEN: {<SCRIPT_DONE: "scriptDone">}
@@ -379,6 +382,10 @@
        t1 = <IDENTIFIER>
        {processDump(t1.image);}
        |
+       <ILLUSTRATE>
+       t1 = <IDENTIFIER>
+       {processIllustrate(t1.image);}
+       |
        <DESCRIBE>
        t1 = <IDENTIFIER>
        {processDescribe(t1.image);}

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java?rev=699504&view=auto
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java 
(added)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java 
Fri Sep 26 14:20:31 2008
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestExampleGenerator extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster
+            .getProperties());
+
+    Random rand = new Random();
+    int MAX = 100;
+    String A, B;
+
+    {
+        try {
+            pigContext.connect();
+        } catch (ExecException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        File fileA, fileB;
+
+        fileA = File.createTempFile("dataA", ".dat");
+        fileB = File.createTempFile("dataB", ".dat");
+
+        writeData(fileA);
+        writeData(fileB);
+
+        A = "'" + FileLocalizer.hadoopify(fileA.toString(), pigContext) + "'";
+        B = "'" + FileLocalizer.hadoopify(fileB.toString(), pigContext) + "'";
+        System.out.println("A : " + A + "\n" + "B : " + B);
+        System.out.println("Test data created.");
+
+    }
+
+    private void writeData(File dataFile) throws Exception {
+        // File dataFile = File.createTempFile(name, ".dat");
+        FileOutputStream dat = new FileOutputStream(dataFile);
+
+        Random rand = new Random();
+
+        for (int i = 0; i < MAX; i++)
+            dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + "\n")
+                    .getBytes());
+
+        dat.close();
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+
+        PigServer pigserver = new PigServer(pigContext);
+
+        String query = "A = load " + A
+                + " using PigStorage() as (x : int, y : int);\n";
+        pigserver.registerQuery(query);
+        query = "B = filter A by x > 10;";
+        pigserver.registerQuery(query);
+        Map<LogicalOperator, DataBag> derivedData = pigserver.getExamples("B");
+
+        assertTrue(derivedData != null);
+
+    }
+
+    @Test
+    public void testForeach() throws ExecException, IOException {
+        PigServer pigServer = new PigServer(pigContext);
+
+        pigServer.registerQuery("A = load " + A
+                + " using PigStorage() as (x : int, y : int);");
+        pigServer.registerQuery("B = foreach A generate x + y as sum;");
+
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B");
+
+        assertTrue(derivedData != null);
+    }
+
+    @Test
+    public void testJoin() throws IOException, ExecException {
+        PigServer pigServer = new PigServer(pigContext);
+        pigServer.registerQuery("A1 = load " + A + " as (x, y);");
+        pigServer.registerQuery("B1 = load " + B + " as (x, y);");
+
+        pigServer.registerQuery("E = join A1 by x, B1 by x;");
+
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("E");
+
+        assertTrue(derivedData != null);
+    }
+
+    @Test
+    public void testCogroupMultipleCols() throws Exception {
+
+        PigServer pigServer = new PigServer(pigContext);
+        pigServer.registerQuery("A = load " + A + " as (x, y);");
+        pigServer.registerQuery("B = load " + B + " as (x, y);");
+        pigServer.registerQuery("C = cogroup A by (x, y), B by (x, y);");
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+        assertTrue(derivedData != null);
+    }
+
+    @Test
+    public void testCogroup() throws Exception {
+        PigServer pigServer = new PigServer(pigContext);
+        pigServer.registerQuery("A = load " + A + " as (x, y);");
+        pigServer.registerQuery("B = load " + B + " as (x, y);");
+        pigServer.registerQuery("C = cogroup A by x, B by x;");
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+        assertTrue(derivedData != null);
+    }
+
+    @Test
+    public void testGroup() throws Exception {
+        PigServer pigServer = new PigServer(pigContext);
+        pigServer.registerQuery("A = load " + A.toString() + " as (x, y);");
+        pigServer.registerQuery("B = group A by x;");
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B");
+
+        assertTrue(derivedData != null);
+
+    }
+    
+    @Test
+    public void testUnion() throws Exception {
+        PigServer pigServer = new PigServer(pigContext);
+        pigServer.registerQuery("A = load " + A.toString() + " as (x, y);");
+        pigServer.registerQuery("B = load " + B.toString() + " as (x, y);");
+        pigServer.registerQuery("C = union A, B;");
+        Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+        assertTrue(derivedData != null);
+    }
+
+}

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java?rev=699504&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java 
Fri Sep 26 14:20:31 2008
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LODefine;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import 
org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.FunctionalLogicalOptimizer;
+import org.junit.Test;
+
+public class TestLocalPOSplit extends TestCase {
+
+    Random r = new Random();
+
+    Log log = LogFactory.getLog(getClass());
+
+    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+
+    @Test
+    public void testSplit() throws IOException, VisitorException, 
ExecException {
+        pigContext.connect();
+        File datFile = File.createTempFile("tempA", ".dat");
+
+        FileOutputStream dat = new FileOutputStream(datFile);
+
+        for (int i = 0; i < 100; i++) {
+            String str = r.nextInt(10) + "\n";
+            dat.write(str.getBytes());
+
+        }
+
+        dat.close();
+
+        String query = "split (load '" + datFile.getAbsolutePath()
+                + "') into a if $0 == 2, b if $0 == 9, c if $0 == 7 ;";
+
+        LogicalPlan plan = buildPlan(query);
+        PhysicalPlan pp = buildPhysicalPlan(plan);
+
+        DataBag[] bag = new DataBag[pp.getLeaves().size()];
+
+        for (int i = 0; i < bag.length; i++) {
+            bag[i] = BagFactory.getInstance().newDefaultBag();
+        }
+
+        for (int i = 0; i < pp.getLeaves().size(); i++) {
+            Tuple t = null;
+            for (Result res = pp.getLeaves().get(i).getNext(t); 
res.returnStatus != POStatus.STATUS_EOP; res = pp
+                    .getLeaves().get(i).getNext(t)) {
+                if (res.returnStatus == POStatus.STATUS_OK)
+                    bag[i].add((Tuple) res.result);
+            }
+        }
+        
+        // Depending on how the "maps" in the physical plan are
+        // built the leaves could be in different order between different runs.
+        // lets test the first tuple out of each leaf to 
+        // 1) ensure the value was not seen before
+        // 2) all the remaining tuples from that leaf are same
+        //    as the first value
+        Map<DataByteArray, Boolean> seen = new HashMap<DataByteArray, 
Boolean>();
+        seen.put(new DataByteArray("7".getBytes()), false);
+        seen.put(new DataByteArray("9".getBytes()), false);
+        seen.put(new DataByteArray("2".getBytes()), false);
+        
+        for (int i = 0; i < bag.length; i++) {
+            DataByteArray firstValue = null;
+            Iterator<Tuple> it = bag[i].iterator();
+            if (it.hasNext()) {
+                // check that we have not seen this value before
+                Tuple t = it.next();
+                System.out.println(t);
+                firstValue = (DataByteArray) t.get(0);
+                assertFalse((Boolean) seen.get(firstValue));
+                seen.put(firstValue, true);
+
+            }
+            // check that all remaining tuples from this 
+            // leaf have the same values as the first value
+            for (; it.hasNext();) {
+                Tuple t = it.next();
+                System.out.println(t);
+                assertEquals(t.get(0), firstValue);
+            }
+        }
+    }
+
+    public PhysicalPlan buildPhysicalPlan(LogicalPlan lp)
+            throws VisitorException {
+        LocalLogToPhyTranslationVisitor visitor = new 
LocalLogToPhyTranslationVisitor(
+                lp);
+        visitor.setPigContext(pigContext);
+        visitor.visit();
+        return visitor.getPhysicalPlan();
+    }
+
+    public LogicalPlan buildPlan(String query) {
+        return buildPlan(query, LogicalPlanBuilder.class.getClassLoader());
+    }
+
+    public LogicalPlan buildPlan(String query, ClassLoader cldr) {
+        LogicalPlanBuilder.classloader = cldr;
+
+        LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
+
+        try {
+            LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases,
+                    logicalOpTable, aliasOp);
+            List<LogicalOperator> roots = lp.getRoots();
+
+            if (roots.size() > 0) {
+                for (LogicalOperator op : roots) {
+                    if (!(op instanceof LOLoad) && !(op instanceof LODefine)) {
+                        throw new Exception(
+                                "Cannot have a root that is not the load or 
define operator. Found "
+                                        + op.getClass().getName());
+                    }
+                }
+            }
+
+            System.err.println("Query: " + query);
+
+            // Just the top level roots and their children
+            // Need a recursive one to travel down the tree
+
+            for (LogicalOperator op : lp.getRoots()) {
+                System.err.println("Logical Plan Root: "
+                        + op.getClass().getName() + " object " + op);
+
+                List<LogicalOperator> listOp = lp.getSuccessors(op);
+
+                if (null != listOp) {
+                    Iterator<LogicalOperator> iter = listOp.iterator();
+                    while (iter.hasNext()) {
+                        LogicalOperator lop = iter.next();
+                        System.err.println("Successor: "
+                                + lop.getClass().getName() + " object " + lop);
+                    }
+                }
+            }
+            lp = refineLogicalPlan(lp);
+            assertTrue(lp != null);
+            return lp;
+        } catch (IOException e) {
+            // log.error(e);
+            // System.err.println("IOException Stack trace for query: " +
+            // query);
+            // e.printStackTrace();
+            fail("IOException: " + e.getMessage());
+        } catch (Exception e) {
+            log.error(e);
+            // System.err.println("Exception Stack trace for query: " + query);
+            // e.printStackTrace();
+            fail(e.getClass().getName() + ": " + e.getMessage() + " -- "
+                    + query);
+        }
+        return null;
+    }
+
+    private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
+        PlanSetter ps = new PlanSetter(plan);
+        try {
+            ps.visit();
+
+        } catch (VisitorException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+        // run through validator
+        CompilationMessageCollector collector = new 
CompilationMessageCollector();
+        FrontendException caught = null;
+        try {
+            LogicalPlanValidationExecutor validator = new 
LogicalPlanValidationExecutor(
+                    plan, pigContext);
+            validator.validate(plan, collector);
+
+            FunctionalLogicalOptimizer optimizer = new 
FunctionalLogicalOptimizer(
+                    plan);
+            optimizer.optimize();
+        } catch (FrontendException fe) {
+            // Need to go through and see what the collector has in it. But
+            // remember what we've caught so we can wrap it into what we
+            // throw.
+            caught = fe;
+        }
+
+        return plan;
+
+    }
+
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, 
LogicalPlan>();
+    Map<OperatorKey, LogicalOperator> logicalOpTable = new 
HashMap<OperatorKey, LogicalOperator>();
+    Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=699504&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java 
Fri Sep 26 14:20:31 2008
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import 
org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+
+public class TestPOCogroup extends TestCase {
+    Random r = new Random();
+
+    public void testCogroup2Inputs() throws Exception {
+        DataBag bag1 = BagFactory.getInstance().newDefaultBag();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(2));
+        bag1.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(1));
+        bag1.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(1));
+        bag1.add(t);
+
+        DataBag bag2 = BagFactory.getInstance().newDefaultBag();
+        t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(2));
+        bag2.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(2));
+        bag2.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(1));
+        bag2.add(t);
+
+        PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), bag1);
+        PORead poread2 = new PORead(new OperatorKey("", r.nextLong()), bag2);
+
+        List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>();
+        inputs1.add(poread1);
+
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj1.setResultType(DataType.INTEGER);
+        PhysicalPlan p1 = new PhysicalPlan();
+        p1.add(prj1);
+        List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
+        in1.add(p1);
+        POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+                .nextLong()), -1, inputs1);
+        lr1.setPlans(in1);
+        lr1.setIndex(0);
+
+        List<PhysicalOperator> inputs2 = new LinkedList<PhysicalOperator>();
+        inputs2.add(poread2);
+
+        POProject prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj2.setResultType(DataType.INTEGER);
+        PhysicalPlan p2 = new PhysicalPlan();
+        p2.add(prj2);
+        List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>();
+        in2.add(p2);
+        POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r
+                .nextLong()), -1, inputs2);
+        lr2.setPlans(in2);
+        lr2.setIndex(1);
+
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(lr1);
+        inputs.add(lr2);
+
+        POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1,
+                inputs);
+
+        List<Tuple> expected = new LinkedList<Tuple>();
+
+        Tuple t1 = TupleFactory.getInstance().newTuple();
+        t1.append(1);
+        DataBag b1 = BagFactory.getInstance().newDefaultBag();
+        Tuple temp = TupleFactory.getInstance().newTuple();
+        temp.append(1);
+        b1.add(temp);
+        b1.add(temp);
+        t1.append(b1);
+
+        DataBag b2 = BagFactory.getInstance().newDefaultBag();
+        b2.add(temp);
+        t1.append(b2);
+
+        expected.add(t1);
+
+        t1 = TupleFactory.getInstance().newTuple();
+        t1.append(2);
+        DataBag b3 = BagFactory.getInstance().newDefaultBag();
+        temp = TupleFactory.getInstance().newTuple();
+        temp.append(2);
+        b3.add(temp);
+        t1.append(b3);
+
+        DataBag b4 = BagFactory.getInstance().newDefaultBag();
+        b4.add(temp);
+        b4.add(temp);
+        t1.append(b4);
+
+        expected.add(t1);
+        // System.out.println(expected.get(0) + " " + expected.get(1));
+        List<Tuple> obtained = new LinkedList<Tuple>();
+        for (Result res = poc.getNext(t); res.returnStatus != 
POStatus.STATUS_EOP; res = poc
+                .getNext(t)) {
+            System.out.println(res.result);
+            obtained.add((Tuple) res.result);
+            assertTrue(expected.contains((Tuple) res.result));
+        }
+        assertEquals(expected.size(), obtained.size());
+    }
+
+    public void testCogroup1Input() throws ExecException {
+        DataBag input = BagFactory.getInstance().newDefaultBag();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(1);
+        t.append(2);
+        input.add(t);
+        input.add(t);
+        Tuple t2 = TupleFactory.getInstance().newTuple();
+        t2.append(2);
+        t2.append(2);
+        Tuple t3 = TupleFactory.getInstance().newTuple();
+        t3.append(3);
+        t3.append(4);
+        input.add(t2);
+        input.add(t3);
+
+        PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>();
+        inputs1.add(poread1);
+
+        POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
1);
+        prj1.setResultType(DataType.INTEGER);
+        PhysicalPlan p1 = new PhysicalPlan();
+        p1.add(prj1);
+        List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
+        in1.add(p1);
+        POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+                .nextLong()), -1, inputs1);
+        lr1.setPlans(in1);
+        lr1.setIndex(0);
+
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(lr1);
+
+        List<Tuple> expected = new LinkedList<Tuple>();
+        Tuple tOut1 = TupleFactory.getInstance().newTuple();
+        tOut1.append(2);
+        DataBag t11 = BagFactory.getInstance().newDefaultBag();
+        t11.add(t);
+        t11.add(t);
+        t11.add(t2);
+        tOut1.append(t11);
+        expected.add(tOut1);
+        Tuple tOut2 = TupleFactory.getInstance().newTuple();
+        tOut2.append(4);
+        DataBag t22 = BagFactory.getInstance().newDefaultBag();
+        t22.add(t3);
+        tOut2.append(t22);
+        expected.add(tOut2);
+        POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1,
+                inputs);
+        int count = 0;
+        for (Result res = poc.getNext(t); res.returnStatus != 
POStatus.STATUS_EOP; res = poc
+                .getNext(t)) {
+            System.out.println(res.result);
+            count++;
+            assertTrue(expected.contains(res.result));
+        }
+        assertEquals(expected.size(), count);
+    }
+
+}


Reply via email to