Author: gates
Date: Mon Apr  7 11:45:04 2008
New Revision: 645643

URL: http://svn.apache.org/viewvc?rev=645643&view=rev
Log:
Shubham's POGenerate work.


Added:
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
Modified:
    incubator/pig/branches/types/build.xml
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java

Modified: incubator/pig/branches/types/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=645643&r1=645642&r2=645643&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon Apr  7 11:45:04 2008
@@ -139,7 +139,7 @@
                 **/test/TestAdd.java, **/test/TestSubtract.java, 
**/test/TestMultiply.java,
                 **/test/TestDivide.java, **/test/TestMod.java, 
**/test/TestGreaterThan.java,
                    
**/test/TestGTOrEqual.java,**/test/TestLessThan.java,**/test/TestLTOrEqual.java,
-                   **/test/TestEqualTo.java,**/test/TestNotEqualTo.java
+                   **/test/TestEqualTo.java,**/test/TestNotEqualTo.java, 
**/test/TestPOGenerate.java,
                 **/test/TestProject.java, **/test/utils/*.java, 
**/logicalLayer/*.java,
                 **/logicalLayer/schema/*.java, 
**/physicalLayer/topLevelOperators/*.java,
                 **/physicalLayer/topLevelOperator/**/*.java, 
**/physicalLayer/plans/*.java,
@@ -243,6 +243,7 @@
                        <include name="**/TestLTOrEqual.java" />
                        <include name="**/TestEqualTo.java" />
                        <include name="**/TestNotEqualTo.java" />
+                       <include name="**/TestPOGenerate.java" />
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=645643&r1=645642&r2=645643&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
 Mon Apr  7 11:45:04 2008
@@ -19,6 +19,7 @@
 
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
@@ -83,8 +84,8 @@
 //        //do nothing
 //    }
     
-//    public void visitGenerate(POGenerate pogen) {
-//        //do nothing
-//    }
+    public void visitGenerate(POGenerate pogen) {
+        //do nothing
+    }
 
 }

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java?rev=645643&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
 Mon Apr  7 11:45:04 2008
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+public class POGenerate extends PhysicalOperator {
+    
+    private List<Boolean> isToBeFlattened;
+    //private Boolean outputIsDataBag;
+    private List<Tuple> outputBuffer = new LinkedList<Tuple>();
+    private List<ExprPlan> inputPlans;
+    private List<ExpressionOperator> inputs = new 
LinkedList<ExpressionOperator>();
+    
+    //its holds the iterators of the databags given by the input expressions 
which need flattening.
+    Iterator<Tuple> [] its = null;
+    
+    //This holds the outputs given out by the input expressions of any datatype
+    Object [] bags = null;
+    
+    //This is the template whcih contains tuples and is flattened out in 
CreateTuple() to generate the final output
+    Object[] data = null;
+
+    public POGenerate(OperatorKey k) {
+        this(k, -1, null, null);
+    }
+    
+    public POGenerate(OperatorKey k, int rp) {
+        this(k, rp, null, null);
+    }
+    
+    public POGenerate(OperatorKey k, List<ExprPlan> inp, List<Boolean> 
isToBeFlattened) {
+        this(k, -1, inp, isToBeFlattened);
+        
+    }
+    
+    public POGenerate(OperatorKey k, int rp, List<ExprPlan> inp, List<Boolean> 
 isToBeFlattened) {
+        super(k, rp);
+        this.isToBeFlattened = isToBeFlattened;
+        this.inputPlans = inp;
+        getLeaves();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitGenerate(this);
+    }
+
+    @Override
+    public String name() {
+        return "POGenerate - " + mKey.toString();
+    }
+
+    /**
+     * The idea is to generate permutations of all the bags. Something like 
trying to generate all possible n digit numbers.
+     * We iterate through the units place. Then do a single iteration on the 
tens place. Then do the iterations on 
+     * units place again. This we continue till all the n places have 
exhausted there digits.
+     */    
+    @Override
+    public Result getNext(Tuple tIn) throws ExecException{
+        int noItems = inputs.size();
+        Result res = new Result();
+        
+        //We check if all the databags have exhausted the tuples. If so we 
enforce the reading of new data by setting data and its to null
+        if(its != null) {
+            boolean restartIts = true;
+            for(int i = 0; i < noItems; ++i) {
+                if(its[i] != null && isToBeFlattened.get(i) == true)
+                    restartIts &= !its[i].hasNext();
+            }
+            //this means that all the databags have reached their last 
elements. so we need to force reading of fresh databags
+            if(restartIts) {
+                its = null;
+                data = null;
+            }
+        }
+        
+        if(its == null) {
+            //getNext being called for the first time OR starting with a set 
of new data from inputs 
+            its = new Iterator[noItems];
+            bags = new Object[noItems];
+            
+            for(int i = 0; i < noItems; ++i) {
+                //Getting the iterators
+                //populate the input data
+                Result inputData = null;
+                Byte resultType = ((PhysicalOperator)inputs.get(i)).resultType;
+                switch(resultType) {
+                case DataType.BAG : DataBag b = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(b);
+                break;
+                case DataType.TUPLE : Tuple t = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(t);
+                break;
+                case DataType.BYTEARRAY : DataByteArray db = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(db);
+                break; 
+                case DataType.MAP : Map map = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(map);
+                break;
+                case DataType.BOOLEAN : Boolean bool = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(bool);
+                break;
+                case DataType.INTEGER : Integer integer = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(integer);
+                break;
+                case DataType.DOUBLE : Double d = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(d);
+                break;
+                case DataType.LONG : Long l = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(l);
+                break;
+                case DataType.FLOAT : Float f = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(f);
+                break;
+                case DataType.CHARARRAY : String str = null;
+                inputData = ((PhysicalOperator)inputs.get(i)).getNext(str);
+                break;
+                }
+                
+                if(inputData.returnStatus == POStatus.STATUS_EOP) {
+                    //we are done with all the elements. Time to return.
+                    its = null;
+                    bags = null;
+                    return inputData;
+                }
+
+                Object input = null;
+                
+                bags[i] = inputData.result;
+                
+                if(inputData.result instanceof DataBag && 
isToBeFlattened.get(i)) 
+                    its[i] = ((DataBag)bags[i]).iterator();
+                else 
+                    its[i] = null;
+                                
+                
+            }
+        }
+        
+        Boolean done = false;
+        while(!done) {
+            if(data == null) {
+                //getNext being called for the first time or starting on new 
input data
+                //we instantiate the template array and start populating it 
with data
+                data = new Object[noItems];
+                for(int i = 0; i < noItems; ++i) {
+                    if(isToBeFlattened.get(i) && bags[i] instanceof DataBag) {
+                        if(its[i].hasNext()) {
+                            data[i] = its[i].next();
+                        } else {
+                            //the input set is null, so we return
+                            res.returnStatus = POStatus.STATUS_NULL;
+                            return res;
+                        }
+                    } else {
+                        data[i] = bags[i];
+                    }
+                    
+                }
+                
+                //CreateTuple(data);
+                res.result = CreateTuple(data);
+                res.returnStatus = POStatus.STATUS_OK;
+                return res;
+            } else {
+                //we try to find the last expression which needs flattening 
and start iterating over it
+                //we also try to update the template array
+                for(int index = noItems - 1; index >= 0; --index) {
+                    if(its[index] != null && isToBeFlattened.get(index)) {
+                        while(its[index].hasNext()) {
+                            data[index] =  its[index].next();
+                            res.result = CreateTuple(data);
+                            res.returnStatus = POStatus.STATUS_OK;
+                            return res;
+                        }
+                    }
+                }
+                //now since the last expression is exhausted, we goto the next 
tuples of the next expression which
+                //needs flattening and also restart the iterator of the 
previously exhausted expression
+                for(int j = noItems - 1; j >= 0; --j) {
+                    if(its[j] != null) {
+                        if(its[j].hasNext()) {
+                            //we've found the next input which needs 
flattening and we update the template array.
+                            data[j] = its[j].next();
+                            res.result = CreateTuple(data);
+                            res.returnStatus = POStatus.STATUS_OK;
+                            return res;
+                        } else {
+                            //rewind the iterator for this particular databag
+                            if(j == 0) {
+                                //this is the first databag and means that we 
are done with the crossproduct. so we break out
+                                done = true;
+                                its = null;
+                                data = null;
+                                break;
+                            }
+                            its[j] = ((DataBag)bags[j]).iterator();
+                            data[j] = its[j].next();
+                        }
+                    }
+                }
+            }
+        }
+        
+        return null;
+        
+    }
+    
+    /**
+     * 
+     * @param data array that is the template for the final flattened tuple
+     * @return the final flattened tuple
+     */
+    private Tuple CreateTuple(Object[] data) throws ExecException {
+        TupleFactory tf = TupleFactory.getInstance();
+        Tuple out = tf.newTuple();
+        for(int i = 0; i < data.length; ++i) {
+            Object in = data[i];
+            
+            if(in instanceof Tuple) {
+                Tuple t = (Tuple)in;
+                for(int j = 0; j < t.size(); ++j) {
+                    try {
+                        out.append(t.get(j));
+                    } catch (IOException e) {
+                        ExecException ee =
+                            new ExecException("Unable to reference field "
+                            + j + " in tuple " + t);
+                        ee.initCause(e);
+                        throw ee;
+                    }
+                }
+            } else
+                out.append(in);
+        }
+        return out;
+    }
+
+    @Override
+    public void attachInput(Tuple t) {
+        for(ExprPlan p : inputPlans) {
+            p.attachInput(t);
+        }
+    }
+    
+    private void getLeaves() {
+        for(ExprPlan p : inputPlans) {
+            inputs.add(p.getLeaves().get(0));
+        }
+    }
+
+    @Override
+    public void visit(PlanVisitor v) throws ParseException {
+        ((PhyPlanVisitor)v).visitGenerate(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+    
+
+
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java?rev=645643&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java 
Mon Apr  7 11:45:04 2008
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+//import static org.apache.pig.PigServer.ExecType.LOCAL;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.LoadFunc;
+//import org.apache.pig.PigServer;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+//import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPOGenerate extends TestCase {
+
+       DataBag cogroup;
+       DataBag partialFlatten;
+       DataBag simpleGenerate;
+       Random r = new Random();
+       BagFactory bf = BagFactory.getInstance();
+       TupleFactory tf = TupleFactory.getInstance();
+       
+       @Before
+       public void setUp() throws Exception {
+               Tuple [] inputA = new Tuple[4];
+               Tuple [] inputB = new Tuple[4];
+               for(int i = 0; i < 4; i++) {
+                       inputA[i] = tf.newTuple(2);
+                       inputB[i] = tf.newTuple(1);
+               }
+               inputA[0].set(0, 'a');
+               inputA[0].set(1, '1');
+               inputA[1].set(0, 'b');
+               inputA[1].set(1, '1');
+               inputA[2].set(0, 'a');
+               inputA[2].set(1, '1');
+               inputA[3].set(0, 'c');
+               inputA[3].set(1, '1');
+               inputB[0].set(0, 'b');
+               inputB[1].set(0, 'b');
+               inputB[2].set(0, 'a');
+               inputB[3].set(0, 'd');
+               DataBag cg11 = bf.newDefaultBag();
+               cg11.add(inputA[0]);
+               cg11.add(inputA[2]);
+               DataBag cg21 = bf.newDefaultBag();
+               cg21.add(inputA[1]);
+               DataBag cg31 = bf.newDefaultBag();
+               cg31.add(inputA[3]);
+               DataBag emptyBag = bf.newDefaultBag();
+               DataBag cg12 = bf.newDefaultBag();
+               cg12.add(inputB[2]);
+               DataBag cg22 = bf.newDefaultBag();
+               cg22.add(inputB[0]);
+               cg22.add(inputB[1]);
+               DataBag cg42 = bf.newDefaultBag();
+               cg42.add(inputB[3]);
+               Tuple [] tIn = new Tuple[4];
+               for(int i = 0; i < 4; ++i) {
+                       tIn[i] = tf.newTuple(2);
+               }
+               tIn[0].set(0, cg11);
+               tIn[0].set(1, cg12);
+               tIn[1].set(0, cg21);
+               tIn[1].set(1, cg22);
+               tIn[2].set(0, cg31);
+               tIn[2].set(1, emptyBag);
+               tIn[3].set(0, emptyBag);
+               tIn[3].set(1, cg42);
+               
+               cogroup = bf.newDefaultBag();
+               for(int i = 0; i < 4; ++i) {
+                       cogroup.add(tIn[i]);
+               }
+               
+               Tuple[] tPartial = new Tuple[4];
+               for(int i = 0; i < 4; ++i) {
+                       tPartial[i] = tf.newTuple(2);
+                       tPartial[i].set(0, inputA[i].get(0));
+                       tPartial[i].set(1, inputA[i].get(1));
+               }
+               
+               tPartial[0].append(cg12);
+               
+               tPartial[1].append(cg22);
+               
+               tPartial[2].append(cg12);
+               
+               tPartial[3].append(emptyBag);
+               
+               partialFlatten = bf.newDefaultBag();
+               for(int i = 0; i < 4; ++i) {
+                       partialFlatten.add(tPartial[i]);
+               }
+               
+               simpleGenerate = bf.newDefaultBag();
+               for(int i = 0; i < 4; ++i) {
+                       simpleGenerate.add(inputA[i]);
+               }
+               /*
+               System.out.println("Cogroup : " + cogroup);
+               System.out.println("Partial : " + partialFlatten);
+               System.out.println("Simple : " + simpleGenerate);
+               */
+       }
+       
+       public void testJoin() throws Exception {
+               ExpressionOperator prj1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
+               ExpressionOperator prj2 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 1);
+               prj1.setResultType(DataType.BAG);
+               prj2.setResultType(DataType.BAG);
+               List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+               toBeFlattened.add(true);
+               toBeFlattened.add(true);
+               ExprPlan plan1 = new ExprPlan();
+               plan1.add(prj1);
+               ExprPlan plan2 = new ExprPlan();
+               plan2.add(prj2);
+               List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+               inputs.add(plan1); 
+               inputs.add(plan2);
+               PhysicalOperator poGen = new POGenerate(new OperatorKey("", 
r.nextLong()), inputs, toBeFlattened);
+               //DataBag obtained = bf.newDefaultBag();
+               for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+                       Tuple t = it.next();
+                       plan1.attachInput(t); 
+                       plan2.attachInput(t);
+                       Result output = poGen.getNext(t);
+                       while(output.result != null && output.returnStatus != 
POStatus.STATUS_EOP) {
+                               //System.out.println(output.result);
+                               Tuple tObtained = (Tuple) output.result;
+                               
assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
+                               //obtained.add((Tuple) output.result);
+                               output = poGen.getNext(t);
+                       }
+               }
+               
+       }
+       
+       public void testPartialJoin() throws Exception {
+               ExpressionOperator prj1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
+               ExpressionOperator prj2 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 1);
+               prj1.setResultType(DataType.BAG);
+               prj2.setResultType(DataType.BAG);
+               List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+               toBeFlattened.add(true);
+               toBeFlattened.add(false);
+               ExprPlan plan1 = new ExprPlan();
+               plan1.add(prj1);
+               ExprPlan plan2 = new ExprPlan();
+               plan2.add(prj2);
+               List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+               inputs.add(plan1); 
+               inputs.add(plan2);
+               PhysicalOperator poGen = new POGenerate(new OperatorKey("", 
r.nextLong()), inputs, toBeFlattened);
+               
+               //DataBag obtained = bf.newDefaultBag();
+               List<String> obtained = new LinkedList<String>();
+               for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+                       Tuple t = it.next();
+                       plan1.attachInput(t); 
+                       plan2.attachInput(t);
+                       Result output = poGen.getNext(t);
+                       while(output.result != null && output.returnStatus != 
POStatus.STATUS_EOP) {
+                               //System.out.println(output.result);
+                               obtained.add(((Tuple) 
output.result).toString());
+                               output = poGen.getNext(t);
+                       }
+               }
+               int count = 0;
+               for(Iterator<Tuple> it = partialFlatten.iterator(); 
it.hasNext(); ) {
+                       Tuple t = it.next();
+                       assertTrue(obtained.contains(t.toString()));
+                       ++count;
+               }
+               assertEquals(partialFlatten.size(), count);
+               
+       }
+       
+       public void testSimpleGenerate() throws Exception {
+               ExpressionOperator prj1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
+               ExpressionOperator prj2 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 1);
+               prj1.setResultType(DataType.BAG);
+               prj2.setResultType(DataType.BAG);
+               List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+               toBeFlattened.add(true);
+               toBeFlattened.add(false);
+               ExprPlan plan1 = new ExprPlan();
+               plan1.add(prj1);
+               ExprPlan plan2 = new ExprPlan();
+               plan2.add(prj2);
+               List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+               inputs.add(plan1); 
+               inputs.add(plan2);
+               PhysicalOperator poGen = new POGenerate(new OperatorKey("", 
r.nextLong()), inputs, toBeFlattened);
+               
+               //DataBag obtained = bf.newDefaultBag();
+               List<String> obtained = new LinkedList<String>();
+               for(Iterator<Tuple> it = simpleGenerate.iterator(); 
it.hasNext(); ) {
+                       Tuple t = it.next();
+                       plan1.attachInput(t); 
+                       plan2.attachInput(t);
+                       Result output = poGen.getNext(t);
+                       while(output.result != null && output.returnStatus != 
POStatus.STATUS_EOP) {
+                               //System.out.println(output.result);
+                               obtained.add(((Tuple) 
output.result).toString());
+                               output = poGen.getNext(t);
+                       }
+               }
+               
+               int count = 0;
+               for(Iterator<Tuple> it = simpleGenerate.iterator(); 
it.hasNext(); ) {
+                       Tuple t = it.next();
+                       assertTrue(obtained.contains(t.toString()));
+                       ++count;
+               }
+               assertEquals(simpleGenerate.size(), count);
+               
+       }
+}


Reply via email to