Author: gates
Date: Mon Nov  3 17:45:47 2008
New Revision: 711158

URL: http://svn.apache.org/viewvc?rev=711158&view=rev
Log:
PIG-350 Patch that redoes join to stream one side of the join instead of 
materialize keys from all inputs into memory.  Patch by Pradeep that built on 
earlier
work done by Daniel.


Added:
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
Modified:
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
 Mon Nov  3 17:45:47 2008
@@ -18,11 +18,13 @@
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ConfigurationValidator;
@@ -127,7 +129,12 @@
         // check whether stream operator is present
         MRStreamHandler checker = new MRStreamHandler(plan);
         checker.visit();
-        
+
+        // optimize joins
+        CoGroupStreamingOptimizerVisitor cgso = new 
MRCompiler.CoGroupStreamingOptimizerVisitor(plan,
+                pc.getProperties().getProperty("join.biggest.input.chunksize", 
POJoinPackage.DEFAULT_CHUNK_SIZE));
+        cgso.visit();
+
         // figure out the type of the key for the map plan
         // this is needed when the key is null to create
         // an appropriate NullableXXXWritable object

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Nov  3 17:45:47 2008
@@ -51,6 +51,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -1175,6 +1176,181 @@
         return mro;
     }
 
+    static class CoGroupStreamingOptimizerVisitor extends MROpPlanVisitor {
+        
+        Log log = LogFactory.getLog(this.getClass());
+        String chunkSize;
+        CoGroupStreamingOptimizerVisitor(MROperPlan plan, String chunkSize) {
+            super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+            this.chunkSize = chunkSize;
+        }
+        
+        /**indTupIter
+         * Look for pattern POPackage->POForEach(if both are flatten), change 
it to POJoinPackage
+         * We can avoid materialize the input and construct the result of join 
on the fly
+         * 
+         * @param mr - map-reduce plan to optimize
+         */ 
+        @Override
+        public void visitMROp(MapReduceOper mr) throws VisitorException {
+            // Only optimize:
+            // 1. POPackage->POForEach is the root of reduce plan
+            // 2. POUnion is the leaf of map plan (so that we exclude 
distinct, sort...)
+            // 3. No combiner plan
+            // 4. POForEach nested plan only contains POProject in any depth
+            // 5. Inside POForEach, all occurrences of the last input are 
flattened
+            if (mr.mapPlan.isEmpty()) return;
+            if (mr.reducePlan.isEmpty()) return;
+
+            // Check combiner plan
+            if (!mr.combinePlan.isEmpty())
+                return;
+            
+            // Check map plan
+            List<PhysicalOperator> mpLeaves = mr.mapPlan.getLeaves();
+            if (mpLeaves.size()!=1) {
+                return;
+            }
+            PhysicalOperator op = mpLeaves.get(0);
+            
+            if (!(op instanceof POUnion)) {
+                return;
+            }
+            
+            // Check reduce plan
+            List<PhysicalOperator> mrRoots = mr.reducePlan.getRoots();
+            if (mrRoots.size()!=1) {
+                return;
+            }
+            
+            op = mrRoots.get(0);
+            if (!(op instanceof POPackage)) {
+                return;
+            }
+            POPackage pack = (POPackage)op;
+            
+            List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(pack);
+            if (sucs.size()!=1) {
+                return;
+            }
+            
+            op = sucs.get(0);
+            boolean lastInputFlattened = true;
+            boolean allSimple = true;
+            if (op instanceof POForEach)
+            {
+                POForEach forEach = (POForEach)op;
+                List<PhysicalPlan> planList = forEach.getInputPlans();
+                List<Boolean> flatten = forEach.getToBeFlattened();
+                POProject projOfLastInput = null;
+                int i = 0;
+                // check all nested foreach plans
+                // 1. If it is simple projection
+                // 2. If last input is all flattened
+                for (PhysicalPlan p:planList)
+                {
+                    PhysicalOperator opProj = p.getRoots().get(0);
+                    if (!(opProj instanceof POProject))
+                    {
+                        allSimple = false;
+                        break;
+                    }
+                    POProject proj = (POProject)opProj;
+                    // the project should just be for one column
+                    // from the input
+                    if(proj.getColumns().size() != 1) {
+                        allSimple = false;
+                        break;
+                    }
+                    
+                    // if input to project is the last input
+                    if (proj.getColumn() == pack.getNumInps())
+                    {
+                        // if we had already seen another project
+                        // which was also for the last input, then
+                        // we might be trying to flatten twice on the
+                        // last input in which case we can't optimize by
+                        // just streaming the tuple to those projects
+                        // IMPORTANT NOTE: THIS WILL NEED TO CHANGE WHEN WE
+                        // OPTIMIZE BUILTINS LIKE SUM() AND COUNT() TO
+                        // TAKE IN STREAMING INPUT
+                        if(projOfLastInput != null) {
+                            allSimple = false;
+                            break;
+                        }
+                        projOfLastInput = proj;
+                        // make sure the project is on a bag which needs to be
+                        // flattened
+                        if (!flatten.get(i) || proj.getResultType() != 
DataType.BAG)
+                        {
+                            lastInputFlattened = false;
+                            break;
+                        }
+                    }
+                    
+                    // if all deeper operators are all project
+                    PhysicalOperator succ = 
p.getSuccessors(proj)!=null?p.getSuccessors(proj).get(0):null;
+                    while (succ!=null)
+                    {
+                        if (!(succ instanceof POProject))
+                        {
+                            allSimple = false;
+                            break;
+                        }
+                        // make sure successors of the last project also 
project bags
+                        // we will be changing it to project tuples
+                        if(proj == projOfLastInput && 
((POProject)succ).getResultType() != DataType.BAG) {
+                            allSimple = false;
+                            break;
+                        }
+                        succ = 
p.getSuccessors(succ)!=null?p.getSuccessors(succ).get(0):null;
+                    }
+                    i++;
+                    if (allSimple==false)
+                        break;
+                }
+                
+                if (lastInputFlattened && allSimple && projOfLastInput != null)
+                {
+                    // Now we can optimize the map-reduce plan
+                    
+                    // Replace POPackage->POForeach to POJoinPackage
+                    String scope = pack.getOperatorKey().scope;
+                    NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+                    POJoinPackage joinPackage;
+                    joinPackage = new POJoinPackage(
+                                new OperatorKey(scope, 
nig.getNextNodeId(scope)), 
+                                -1, pack, forEach);
+                    joinPackage.setChunkSize(Long.parseLong(chunkSize));
+                    PhysicalOperator nextOp = null;
+                    List<PhysicalOperator> succs = 
mr.reducePlan.getSuccessors(forEach);
+                    if (succs!=null)
+                    {
+                        if (succs.size()!=1)
+                        {
+                            String msg = new String("forEach can only have one 
successor");
+                            log.error(msg);
+                            throw new VisitorException(msg);
+                        }
+                        nextOp = succs.get(0);
+                    }
+                    mr.reducePlan.remove(pack);
+                    
+                    try {
+                        mr.reducePlan.replace(forEach, joinPackage);
+                    } catch (PlanException e) {
+                        String msg = new String("Error rewrite POJoinPackage");
+                        log.error(msg);
+                        throw new VisitorException(msg, e);
+                    }
+                    
+                    log.info("Rewrite: POPackage->POForEach to POJoinPackage");
+                }
+            }
+        }
+
+    }
+    
     private class RearrangeAdjuster extends MROpPlanVisitor {
 
         RearrangeAdjuster(MROperPlan plan) {

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Mon Nov  3 17:45:47 2008
@@ -33,12 +33,14 @@
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ConfigurationValidator;
@@ -145,6 +147,11 @@
         MRStreamHandler checker = new MRStreamHandler(plan);
         checker.visit();
         
+        // optimize joins
+        CoGroupStreamingOptimizerVisitor cgso = new 
MRCompiler.CoGroupStreamingOptimizerVisitor(plan, 
+                pc.getProperties().getProperty("join.biggest.input.chunksize", 
POJoinPackage.DEFAULT_CHUNK_SIZE));
+        cgso.visit();
+        
         // figure out the type of the key for the map plan
         // this is needed when the key is null to create
         // an appropriate NullableXXXWritable object

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Mon Nov  3 17:45:47 2008
@@ -40,6 +40,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TargetedTuple;
@@ -201,8 +202,32 @@
             pigReporter.setRep(reporter);
             PhysicalOperator.setReporter(pigReporter);
 
-            pack.attachInput(key, tupIter);
-            
+            // In the case we optimize the join, we combine
+            // POPackage and POForeach - so we could get many
+            // tuples out of the getnext() call of POJoinPackage
+            // In this case, we process till we see EOP from 
+            // POJoinPacakage.getNext()
+            if (pack instanceof POJoinPackage)
+            {
+                pack.attachInput(key, tupIter);
+                while (true)
+                {
+                    if (processOnePackageOutput(oc))
+                        break;
+                }
+            }
+            else {
+                // join is not optimized, so package will
+                // give only one tuple out for the key
+                pack.attachInput(key, tupIter);
+                processOnePackageOutput(oc);
+            }
+        }
+        
+        // return: false-more output
+        //         true- end of processing
+        public boolean 
processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc) 
throws IOException
+        {
             try {
                 Tuple t=null;
                 Result res = pack.getNext(t);
@@ -211,7 +236,7 @@
                     
                     if(rp.isEmpty()){
                         oc.collect(null, packRes);
-                        return;
+                        return false;
                     }
                     
                     rp.attachInput(packRes);
@@ -224,15 +249,19 @@
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_NULL) {
-                    return;
+                    return false;
                 }
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
                     IOException ioe = new IOException("Packaging error while 
processing group");
                     throw ioe;
                 }
-                    
                 
+                if(res.returnStatus==POStatus.STATUS_EOP) {
+                    return true;
+                }
+                    
+                return false;
             } catch (ExecException e) {
                 IOException ioe = new IOException(e.getMessage());
                 ioe.initCause(e.getCause());

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Mon Nov  3 17:45:47 2008
@@ -213,7 +213,6 @@
     public Result processInput() throws ExecException {
         
         Result res = new Result();
-        Tuple inpValue = null;
         if (input == null && (inputs == null || inputs.size()==0)) {
 //            log.warn("No inputs found. Signaling End of Processing.");
             res.returnStatus = POStatus.STATUS_EOP;
@@ -224,7 +223,7 @@
         if(reporter!=null) reporter.progress();
             
         if (!isInputAttached()) {
-            return inputs.get(0).getNext(inpValue);
+            return inputs.get(0).getNext(dummyTuple);
         } else {
             res.result = input;
             res.returnStatus = (res.result == null ? POStatus.STATUS_NULL: 
POStatus.STATUS_OK);

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Mon Nov  3 17:45:47 2008
@@ -206,6 +206,10 @@
         // TODO Auto-generated method stub
         
     }
+    
+    public void visitJoinPackage(POJoinPackage joinPackage) throws 
VisitorException{
+        //do nothing
+    }
 
     public void visitCast(POCast cast) {
         // TODO Auto-generated method stub
@@ -249,5 +253,13 @@
         
     }
 
+    /**
+     * @param optimizedForEach
+     */
+    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
+        // TODO Auto-generated method stub
+        
+    }
+
 
 }

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Mon Nov  3 17:45:47 2008
@@ -1,5 +1,7 @@
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -32,25 +34,37 @@
      */
     private static final long serialVersionUID = 1L;
     
-    private List<Boolean> isToBeFlattened;
-    private List<PhysicalPlan> inputPlans;
-    private List<PhysicalOperator> planLeaves = new 
LinkedList<PhysicalOperator>();
-    private Log log = LogFactory.getLog(getClass());
+    protected List<PhysicalPlan> inputPlans;
+    protected Log log = LogFactory.getLog(getClass());
+    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
     //Since the plan has a generate, this needs to be maintained
     //as the generate can potentially return multiple tuples for
     //same call.
-    private boolean processingPlan = false;
+    protected boolean processingPlan = false;
     
     //its holds the iterators of the databags given by the input expressions 
which need flattening.
-    Iterator<Tuple> [] its = null;
+    protected Iterator<Tuple> [] its = null;
     
     //This holds the outputs given out by the input expressions of any datatype
-    Object [] bags = null;
+    protected Object [] bags = null;
     
     //This is the template whcih contains tuples and is flattened out in 
CreateTuple() to generate the final output
-    Object[] data = null;
+    protected Object[] data = null;
+    
+    // store result types of the plan leaves
+    protected byte[] resultTypes = null;
+    
+    // array version of isToBeFlattened - this is purely
+    // for optimization - instead of calling isToBeFlattened.get(i)
+    // we can do the quicker array access - isToBeFlattenedArray[i].
+    // Also we can store "boolean" values rather than "Boolean" objects
+    // so we can also save on the Boolean.booleanValue() calls
+    protected boolean[] isToBeFlattenedArray;
     
     ExampleTuple tIn = null;
+    protected int noItems;
+
+    protected PhysicalOperator[] planLeafOps = null;
     
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
@@ -70,7 +84,7 @@
     
     public POForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, 
List<Boolean>  isToBeFlattened){
         super(k, rp);
-        this.isToBeFlattened = isToBeFlattened;
+        setUpFlattens(isToBeFlattened);
         this.inputPlans = inp;
         getLeaves();
     }
@@ -86,11 +100,11 @@
         return "New For Each" + "(" + fString + ")" + "[" + 
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
     
-    private String getFlatStr() {
-        if(isToBeFlattened==null)
+    String getFlatStr() {
+        if(isToBeFlattenedArray ==null)
             return "";
         StringBuilder sb = new StringBuilder();
-        for (Boolean b : isToBeFlattened) {
+        for (Boolean b : isToBeFlattenedArray) {
             sb.append(b);
             sb.append(',');
         }
@@ -181,15 +195,14 @@
         }
     }
 
-    private Result processPlan() throws ExecException{
-        int noItems = planLeaves.size();
+    protected Result processPlan() throws ExecException{
         Result res = new Result();
         
         //We check if all the databags have exhausted the tuples. If so we 
enforce the reading of new data by setting data and its to null
         if(its != null) {
             boolean restartIts = true;
             for(int i = 0; i < noItems; ++i) {
-                if(its[i] != null && isToBeFlattened.get(i) == true)
+                if(its[i] != null && isToBeFlattenedArray[i] == true)
                     restartIts &= !its[i].hasNext();
             }
             //this means that all the databags have reached their last 
elements. so we need to force reading of fresh databags
@@ -208,44 +221,42 @@
                 //Getting the iterators
                 //populate the input data
                 Result inputData = null;
-                byte resultType = 
((PhysicalOperator)planLeaves.get(i)).getResultType();
-                switch(resultType) {
+                switch(resultTypes[i]) {
                 case DataType.BAG:
-                    DataBag b = null;
-                    inputData = 
((PhysicalOperator)planLeaves.get(i)).getNext(b);
+                    inputData = planLeafOps[i].getNext(dummyBag);
                     break;
 
-                case DataType.TUPLE : Tuple t = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(t);
+                case DataType.TUPLE :
+                inputData = planLeafOps[i].getNext(dummyTuple);
                 break;
-                case DataType.BYTEARRAY : DataByteArray db = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(db);
+                case DataType.BYTEARRAY :
+                inputData = planLeafOps[i].getNext(dummyDBA);
                 break; 
-                case DataType.MAP : Map map = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(map);
+                case DataType.MAP :
+                inputData = planLeafOps[i].getNext(dummyMap);
                 break;
-                case DataType.BOOLEAN : Boolean bool = null;
-                inputData = 
((PhysicalOperator)planLeaves.get(i)).getNext(bool);
+                case DataType.BOOLEAN :
+                inputData = planLeafOps[i].getNext(dummyBool);
                 break;
-                case DataType.INTEGER : Integer integer = null;
-                inputData = 
((PhysicalOperator)planLeaves.get(i)).getNext(integer);
+                case DataType.INTEGER :
+                inputData = planLeafOps[i].getNext(dummyInt);
                 break;
-                case DataType.DOUBLE : Double d = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(d);
+                case DataType.DOUBLE :
+                inputData = planLeafOps[i].getNext(dummyDouble);
                 break;
-                case DataType.LONG : Long l = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(l);
+                case DataType.LONG :
+                inputData = planLeafOps[i].getNext(dummyLong);
                 break;
-                case DataType.FLOAT : Float f = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(f);
+                case DataType.FLOAT :
+                inputData = planLeafOps[i].getNext(dummyFloat);
                 break;
-                case DataType.CHARARRAY : String str = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(str);
+                case DataType.CHARARRAY :
+                inputData = planLeafOps[i].getNext(dummyString);
                 break;
 
                 default:
                     String msg = new String("Unknown type " +
-                        DataType.findTypeName(resultType));
+                        DataType.findTypeName(resultTypes[i]));
                     log.error(msg);
                     throw new ExecException(msg);
                 }
@@ -265,7 +276,7 @@
                 
                 bags[i] = inputData.result;
                 
-                if(inputData.result instanceof DataBag && 
isToBeFlattened.get(i)) 
+                if(inputData.result instanceof DataBag && 
isToBeFlattenedArray[i]) 
                     its[i] = ((DataBag)bags[i]).iterator();
                 else 
                     its[i] = null;
@@ -279,7 +290,7 @@
                 //we instantiate the template array and start populating it 
with data
                 data = new Object[noItems];
                 for(int i = 0; i < noItems; ++i) {
-                    if(isToBeFlattened.get(i) && bags[i] instanceof DataBag) {
+                    if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
@@ -305,7 +316,7 @@
                 //we try to find the last expression which needs flattening 
and start iterating over it
                 //we also try to update the template array
                 for(int index = noItems - 1; index >= 0; --index) {
-                    if(its[index] != null && isToBeFlattened.get(index)) {
+                    if(its[index] != null && isToBeFlattenedArray[index]) {
                         if(its[index].hasNext()) {
                             data[index] =  its[index].next();
                             res.result = CreateTuple(data);
@@ -313,6 +324,11 @@
                             return res;
                         }
                         else{
+                            // reset this index's iterator so cross product 
can be achieved
+                            // we would be resetting this way only for the 
indexes from the end
+                            // when the first index which needs to be 
flattened has reached the
+                            // last element in its iterator, we won't come 
here - instead, we reset
+                            // all iterators at the beginning of this method.
                             its[index] = ((DataBag)bags[index]).iterator();
                             data[index] = its[index].next();
                         }
@@ -329,15 +345,15 @@
      * @param data array that is the template for the final flattened tuple
      * @return the final flattened tuple
      */
-    private Tuple CreateTuple(Object[] data) throws ExecException {
-        TupleFactory tf = TupleFactory.getInstance();
-        Tuple out = tf.newTuple();
+    protected Tuple CreateTuple(Object[] data) throws ExecException {
+        Tuple out =  mTupleFactory.newTuple();
         for(int i = 0; i < data.length; ++i) {
             Object in = data[i];
             
-            if(isToBeFlattened.get(i) && in instanceof Tuple) {
+            if(isToBeFlattenedArray[i] && in instanceof Tuple) {
                 Tuple t = (Tuple)in;
-                for(int j = 0; j < t.size(); ++j) {
+                int size = t.size();
+                for(int j = 0; j < size; ++j) {
                     out.append(t.get(j));
                 }
             } else
@@ -352,25 +368,45 @@
     }
 
     
-    private void attachInputToPlans(Tuple t) {
+    protected void attachInputToPlans(Tuple t) {
         //super.attachInput(t);
         for(PhysicalPlan p : inputPlans) {
             p.attachInput(t);
         }
     }
     
-    private void getLeaves() {
+    protected void getLeaves() {
         if (inputPlans != null) {
             int i=-1;
+            if(isToBeFlattenedArray == null) {
+                isToBeFlattenedArray = new boolean[inputPlans.size()];
+            }
+            planLeafOps = new PhysicalOperator[inputPlans.size()];
             for(PhysicalPlan p : inputPlans) {
                 ++i;
                 PhysicalOperator leaf = 
(PhysicalOperator)p.getLeaves().get(0); 
-                planLeaves.add(leaf);
+                planLeafOps[i] = leaf;
                 if(leaf instanceof POProject &&
                         leaf.getResultType() == DataType.TUPLE &&
                         ((POProject)leaf).isStar())
-                    isToBeFlattened.set(i, true);
+                    isToBeFlattenedArray[i] = true;
+            }
+        }
+        // we are calculating plan leaves
+        // so lets reinitialize
+        reInitialize();
+    }
+    
+    private void reInitialize() {
+        if(planLeafOps != null) {
+            noItems = planLeafOps.length;
+            resultTypes = new byte[noItems];
+            for (int i = 0; i < resultTypes.length; i++) {
+                resultTypes[i] = planLeafOps[i].getResultType();
             }
+        } else {
+            noItems = 0;
+            resultTypes = null;
         }
     }
     
@@ -380,22 +416,49 @@
 
     public void setInputPlans(List<PhysicalPlan> plans) {
         inputPlans = plans;
-        planLeaves.clear();
+        planLeafOps = null;
         getLeaves();
     }
 
     public void addInputPlan(PhysicalPlan plan, boolean flatten) {
         inputPlans.add(plan);
-        planLeaves.add(plan.getLeaves().get(0));
-        isToBeFlattened.add(flatten);
+        // add to planLeafOps
+        // copy existing leaves
+        PhysicalOperator[] newPlanLeafOps = new 
PhysicalOperator[planLeafOps.length + 1];
+        for (int i = 0; i < planLeafOps.length; i++) {
+            newPlanLeafOps[i] = planLeafOps[i];
+        }
+        // add to the end
+        newPlanLeafOps[planLeafOps.length] = plan.getLeaves().get(0); 
+        planLeafOps = newPlanLeafOps;
+        
+        // add to isToBeFlattenedArray
+        // copy existing values
+        boolean[] newIsToBeFlattenedArray = new 
boolean[isToBeFlattenedArray.length + 1];
+        for(int i = 0; i < isToBeFlattenedArray.length; i++) {
+            newIsToBeFlattenedArray[i] = isToBeFlattenedArray[i];
+        }
+        // add to end
+        newIsToBeFlattenedArray[isToBeFlattenedArray.length] = flatten;
+        isToBeFlattenedArray = newIsToBeFlattenedArray;
+        
+        // we just added a leaf - reinitialize
+        reInitialize();
     }
 
     public void setToBeFlattened(List<Boolean> flattens) {
-        isToBeFlattened = flattens;
+        setUpFlattens(flattens);
     }
 
     public List<Boolean> getToBeFlattened() {
-        return isToBeFlattened;
+        List<Boolean> result = null;
+        if(isToBeFlattenedArray != null) {
+            result = new ArrayList<Boolean>();
+            for (int i = 0; i < isToBeFlattenedArray.length; i++) {
+                result.add(isToBeFlattenedArray[i]);
+            }
+        }
+        return result;
     }
 
     /**
@@ -409,17 +472,33 @@
         for (PhysicalPlan plan : inputPlans) {
             plans.add(plan.clone());
         }
-        List<Boolean> flattens = new
-            ArrayList<Boolean>(isToBeFlattened.size());
-        for (Boolean b : isToBeFlattened) {
-            // Boolean is immutable, so using same reference is ok
-            flattens.add(b);
+        List<Boolean> flattens = null;
+        if(isToBeFlattenedArray != null) {
+            flattens = new
+                ArrayList<Boolean>(isToBeFlattenedArray.length);
+            for (boolean b : isToBeFlattenedArray) {
+                flattens.add(b);
+            }
         }
         return new POForEach(new OperatorKey(mKey.scope, 
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
             requestedParallelism, plans, flattens);
     }
 
-
-
+    public boolean inProcessing()
+    {
+        return processingPlan;
+    }
+    
+    protected void setUpFlattens(List<Boolean> isToBeFlattened) {
+        if(isToBeFlattened == null) {
+            isToBeFlattenedArray = null;
+        } else {
+            isToBeFlattenedArray = new boolean[isToBeFlattened.size()];
+            int i = 0;
+            for (Iterator<Boolean> it = isToBeFlattened.iterator(); 
it.hasNext();) {
+                isToBeFlattenedArray[i++] = it.next();
+            }
+        }
+    }
 }

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=711158&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
 Mon Nov  3 17:45:47 2008
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POJoinPackage extends POPackage {
+
+    private static final long serialVersionUID = 1L;
+    
+    private POOptimizedForEach forEach;
+    private boolean newKey = true;
+    private Tuple res = null;
+    private boolean lastInputTuple = false;
+    private static final Tuple t1 = null;
+    private static final Result eopResult = new Result(POStatus.STATUS_EOP, 
null);
+
+    public static final String DEFAULT_CHUNK_SIZE = "1000";
+
+    private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
+    private Result forEachResult;
+    private DataBag[] dbs = null;
+
+    private int lastBagIndex;
+    
+    public POJoinPackage(OperatorKey k, int rp, POPackage p, POForEach f) {
+        super(k, rp);
+        String scope = getOperatorKey().getScope();
+        NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+        forEach = new POOptimizedForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+        if (p!=null)
+        {
+            setKeyType(p.getKeyType());
+            setNumInps(p.getNumInps());
+            lastBagIndex = numInputs - 1;
+            setInner(p.getInner());
+            setKeyInfo(p.getKeyInfo());
+            this.isKeyTuple = p.isKeyTuple;
+        }
+        if (f!=null)
+        {
+            setInputPlans(f.getInputPlans());
+            setToBeFlattened(f.getToBeFlattened());
+        }
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitJoinPackage(this);
+    }
+
+    @Override
+    public String name() {
+        String fString = forEach.getFlatStr();
+        return "POJoinPackage" + "(" + fString + ")" + "[" + 
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+
+    /**
+     * Calls getNext to get next ForEach result. The input for POJoinPackage 
is 
+     * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, 
feed input#n 
+     * one tuple a time to the delegated ForEach operator, the input for 
ForEach is
+     * 
+     *     (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose 
input#n consists
+     *     
+     * of k tuples.
+     * For every ForEach input, pull all the results from ForEach.
+     * getNext will be called multiple times for a particular input, 
+     * it returns one output tuple from ForEach every time we call getNext, 
+     * so we need to maintain internal status to keep tracking of where we are.
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // if a previous call to foreach.getNext()
+        // has still not returned all output, process it
+        if (forEach.processingPlan)
+        {
+            forEachResult = forEach.getNext(t1);
+            switch (forEachResult.returnStatus)
+            {
+            case POStatus.STATUS_OK:
+            case POStatus.STATUS_NULL:
+            case POStatus.STATUS_ERR:
+                return forEachResult;
+            case POStatus.STATUS_EOP:
+                break;
+            }
+        }
+        
+        NullableTuple it = null;
+        
+        // If we see a new NullableTupleIterator, materialize n-1 inputs, 
construct ForEach input
+        // tuple res = (key, input#1, input#2....input#n), the only missing 
value is input#n,
+        // we will get input#n one tuple a time, fill in res, feed to ForEach.
+        // After this block, we have the first tuple of input#n in hand (kept 
in variable it)
+        if (newKey)
+        {
+            lastInputTuple = false;
+            //Put n-1 inputs into bags
+            dbs = new DataBag[numInputs];
+            for (int i = 0; i < numInputs; i++) {
+                dbs[i] = mBagFactory.newDefaultBag();
+            }
+            
+            //For each Nullable tuple in the input, put it
+            //into the corresponding bag based on the index,
+            // except for the last input, which we will stream
+            // The tuples will arrive in the order of the index,
+            // starting from index 0 and such that all tuples for
+            // a given index arrive before a tuple for the next
+            // index does.
+            while (tupIter.hasNext()) {
+                it = tupIter.next();
+                int itIndex = it.getIndex();
+                if (itIndex!= numInputs - 1)
+                {
+                    dbs[itIndex].add(getValueTuple(it, itIndex));
+                }
+                else
+                {
+                    lastInputTuple = true;
+                    break;
+                }
+                if(reporter!=null) reporter.progress();
+            }
+            // If we don't have any tuple for input#n
+            // we do not need any further process, return EOP
+            if (!lastInputTuple)
+            {
+                // we will return at this point because we ought
+                // to be having a flatten on this last input
+                // and we have an empty bag which should result
+                // in this key being taken out of the output
+                newKey = true;
+                return eopResult;
+            }
+            
+            res = mTupleFactory.newTuple(numInputs+1);
+            for (int i = 0; i < dbs.length; i++)
+                res.set(i+1,dbs[i]);
+
+            res.set(0,key);
+            // if we have an inner anywhere and the corresponding
+            // bag is empty, we can just return
+            for (int i = 0; i < dbs.length - 1; i++) {
+                if(inner[i]&&dbs[i].size()==0){
+                    detachInput();
+                    return eopResult;
+                }
+            }
+            newKey = false;
+            
+                       // set up the bag with last input to contain
+                       // a chunk of CHUNKSIZE values OR the entire bag if
+                       // it has less than CHUNKSIZE values - the idea is in 
most
+                       // cases the values are > CHUNKSIZE in number and in 
+                       // those cases we will be sending the last bag
+                       // as a set of smaller chunked bags thus holding lesser
+                       // in memory
+                       
+                       // the first tuple can be directly retrieved from "it"
+                       dbs[lastBagIndex].add(getValueTuple(it, it.getIndex()));
+                       for(int i = 0; i < chunkSize -1 && tupIter.hasNext(); 
i++) {
+                           it = tupIter.next();
+                           dbs[lastBagIndex].add(getValueTuple(it, 
it.getIndex()));
+                       }
+
+                       // Attach the input to forEach
+            forEach.attachInput(res);
+            
+            // pull output tuple from ForEach
+            Result forEachResult = forEach.getNext(t1);
+            {
+                switch (forEachResult.returnStatus)
+                {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_NULL:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+        }
+        
+        // Keep attaching input tuple to ForEach, until:
+        // 1. We can initialize ForEach.getNext();
+        // 2. There is no more input#n
+        while (true)
+        {
+            if (tupIter.hasNext()) {
+                // try setting up a bag of CHUNKSIZE OR
+                // the remainder of the bag of last input
+                // (if < CHUNKSIZE) to foreach
+                dbs[lastBagIndex].clear(); // clear last chunk
+                for(int i = 0; i < chunkSize && tupIter.hasNext(); i++) {
+                    it = tupIter.next();
+                    dbs[lastBagIndex].add(getValueTuple(it, it.getIndex()));
+                }
+            }
+            else
+            // if we do not have any more tuples for input#n, return EOP
+            {
+                detachInput();
+                newKey = true;
+                return eopResult;
+            }
+            // Attach the input to forEach
+            forEach.attachInput(res);
+            
+            // pull output tuple from ForEach
+            Result forEachResult = forEach.getNext(t1);
+            {
+                switch (forEachResult.returnStatus)
+                {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_NULL:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+        }
+    }
+    
+    public List<PhysicalPlan> getInputPlans() {
+        return forEach.getInputPlans();
+    }
+
+    public void setInputPlans(List<PhysicalPlan> plans) {
+        forEach.setInputPlans(plans);
+    }
+
+    public void setToBeFlattened(List<Boolean> flattens) {
+        forEach.setToBeFlattened(flattens);
+    }
+
+    /**
+     * @return the forEach
+     */
+    public POOptimizedForEach getForEach() {
+        return forEach;
+    }
+
+    /**
+     * @param chunkSize - the chunk size for the biggest input
+     */
+    public void setChunkSize(long chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+}

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=711158&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 Mon Nov  3 17:45:47 2008
@@ -0,0 +1,154 @@
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/**
+ * A specialized version of POForeach with the difference
+ * that in getNext(), it knows that "input" has already been
+ * attached by its input operator which SHOULD be POJoinPackage
+ */
+public class POOptimizedForEach extends POForEach {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    
+    public POOptimizedForEach(OperatorKey k) {
+        this(k,-1,null,null);
+    }
+
+    public POOptimizedForEach(OperatorKey k, int rp, List inp) {
+        this(k,rp,inp,null);
+    }
+
+    public POOptimizedForEach(OperatorKey k, int rp) {
+        this(k,rp,null,null);
+    }
+
+    public POOptimizedForEach(OperatorKey k, List inp) {
+        this(k,-1,inp,null);
+    }
+    
+    public POOptimizedForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, 
List<Boolean>  isToBeFlattened){
+        super(k, rp);
+        setUpFlattens(isToBeFlattened);
+        this.inputPlans = inp;
+        getLeaves();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitPOOptimizedForEach(this);
+    }
+
+    @Override
+    public String name() {
+        String fString = getFlatStr();
+        return "Optimized For Each" + "(" + fString + ")" + "[" + 
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+    
+    /**
+     * Calls getNext on the generate operator inside the nested
+     * physical plan and returns it maintaining an additional state
+     * to denote the begin and end of the nested plan processing.
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = null;
+        Result inp = null;
+        //The nested plan is under processing
+        //So return tuples that the generate oper
+        //returns
+        if(processingPlan){
+            while(true) {
+                res = processPlan();
+                if(res.returnStatus==POStatus.STATUS_OK) {
+                    if(lineageTracer !=  null && res.result != null) {
+                       ExampleTuple tOut = new ExampleTuple((Tuple) 
res.result);
+                       tOut.synthetic = tIn.synthetic;
+                       lineageTracer.insert(tOut);
+                       lineageTracer.union(tOut, tIn);
+                       res.result = tOut;
+                    }
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_EOP) {
+                    processingPlan = false;
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_ERR) {
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_NULL) {
+                    continue;
+                }
+            }
+        }
+        //The nested plan processing is done or is
+        //yet to begin. So process the input and start
+        //nested plan processing on the input tuple
+        //read
+        while (true) {
+            
+            // we know that input has been attached 
+            attachInputToPlans(input);
+            detachInput();
+            res = processPlan();
+            
+            processingPlan = true;
+
+            if(lineageTracer != null && res.result != null) {
+               //we check for res.result since that can also be null in the 
case of flatten
+               tIn = (ExampleTuple) inp.result;
+               ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+               tOut.synthetic = tIn.synthetic;
+               lineageTracer.insert(tOut);
+               lineageTracer.union(tOut, tIn);
+               res.result = tOut;
+            }
+            
+            return res;
+        }
+    }
+
+    
+    /**
+     * Make a deep copy of this operator.  
+     * @throws CloneNotSupportedException
+     */
+    @Override
+    public POOptimizedForEach clone() throws CloneNotSupportedException {
+        List<PhysicalPlan> plans = new
+            ArrayList<PhysicalPlan>(inputPlans.size());
+        for (PhysicalPlan plan : inputPlans) {
+            plans.add(plan.clone());
+        }
+        List<Boolean> flattens = null;
+        if(isToBeFlattenedArray != null ) {
+            flattens = new 
+            ArrayList<Boolean>(isToBeFlattenedArray.length);
+            for (boolean b : isToBeFlattenedArray) {
+                flattens.add(b);
+            }
+        }
+        return new POOptimizedForEach(new OperatorKey(mKey.scope, 
+            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+            requestedParallelism, plans, flattens);
+    }
+
+    
+}

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=711158&r1=711157&r2=711158&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Mon Nov  3 17:45:47 2008
@@ -239,6 +239,10 @@
 
        
                //Construct the cogroup operator and add it to the logical plan
+        // for join, inner is true for all the inputs involved in the join
+        for (int i = 0; i < n; i++) {
+                       (gis.get(i)).isInner = true;
+        }
                LogicalOperator cogroup = parseCogroup(gis, lp);
                lp.add(cogroup);
                log.debug("Added operator " + cogroup.getClass().getName() + " 
to the logical plan");
@@ -249,7 +253,6 @@
                        ExpressionOperator column = new LOProject(projectPlan, 
new OperatorKey(scope, getNextId()), projectInput, i+1);
                        flattenList.add(true);
                        flattenedColumns.add(column);
-                       (gis.get(i)).isInner = true;
                        projectPlan.add(column);
             if(projectInput instanceof ExpressionOperator) {
                            projectPlan.add(projectInput);


Reply via email to