Author: pradeepkth
Date: Wed Mar 11 23:03:46 2009
New Revision: 752683

URL: http://svn.apache.org/viewvc?rev=752683&view=rev
Log:
 PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)

Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Mar 11 23:03:46 2009
@@ -410,3 +410,5 @@
     PIG-627: multiquery support M1 (hagleitn via olgan)
 
     PIG-627: multiquery support M2 (hagleitn via pradeepkth)
+    
+    PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Wed Mar 11 
23:03:46 2009
@@ -28,10 +28,12 @@
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -49,6 +51,9 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -62,6 +67,7 @@
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOStore;
@@ -838,9 +844,11 @@
         
         private Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
        
-        private ArrayList<String> scriptCache = new ArrayList<String>();       
+        private List<String> scriptCache = new ArrayList<String>();    
     
-        private Map<LogicalOperator, LogicalPlan> storeOpTable = new 
HashMap<LogicalOperator, LogicalPlan>();
+        private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, 
LogicalPlan>();
+        
+        private Set<LOLoad> loadOps = new HashSet<LOLoad>();
 
         private String jobName;
         
@@ -866,9 +874,7 @@
         
         Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
         
-        ArrayList<String> getScriptCache() { return scriptCache; }
-        
-        Map<LogicalOperator, LogicalPlan> getStoreOpTable() { return 
storeOpTable; }
+        List<String> getScriptCache() { return scriptCache; }
         
         boolean isBatchOn() { return batchMode; };
         
@@ -922,8 +928,15 @@
                         }
                     } else {
                         if (0 == ignoreNumStores) {
-                            storeOpTable.put(op, tmpLp);
+                            storeOpTable.put((LOStore)op, tmpLp);
                             lp.mergeSharedPlan(tmpLp);
+                            List<LogicalOperator> roots = tmpLp.getRoots();
+                            for (LogicalOperator root : roots) {
+                                if (root instanceof LOLoad) {
+                                    loadOps.add((LOLoad)root);
+                                }
+                            }
+
                         } else {
                             --ignoreNumStores;
                         }
@@ -987,10 +1000,51 @@
                         graph.lp = graph.parseQuery(it.next(), lineNumber);
                     }
                 }
+                graph.postProcess();
             } catch (IOException ioe) {
                 graph = null;
-            }
+            }          
             return graph;
         }
+        
+        private void postProcess() throws IOException {
+
+            // The following code deals with store/load combination of 
+            // intermediate files. In this case we replace the load operator
+            // with a (implicit) split operator.
+            for (LOLoad load : loadOps) {
+                for (LOStore store : storeOpTable.keySet()) {
+                    String ifile = load.getInputFile().getFileName();
+                    String ofile = store.getOutputFile().getFileName();
+                    if (ofile.compareTo(ifile) == 0) {
+                        LogicalOperator storePred = 
lp.getPredecessors(store).get(0);
+
+                        lp.disconnect(store, load);
+                        lp.replace(load, storePred);
+
+                        List<LogicalOperator> succs = 
lp.getSuccessors(storePred);
+
+                        for (LogicalOperator succ : succs) {
+                            MultiMap<LogicalOperator, LogicalPlan> innerPls = 
null;
+
+                            // fix inner plans for cogroup and frjoin operators
+                            if (succ instanceof LOCogroup) {
+                                innerPls = ((LOCogroup)succ).getGroupByPlans();
+                            } else if (succ instanceof LOFRJoin) {
+                                innerPls = ((LOFRJoin)succ).getJoinColPlans();
+                            }
+
+                            if (innerPls != null) {
+                                if (innerPls.containsKey(load)) {
+                                    Collection<LogicalPlan> pls = 
innerPls.get(load);
+                                    innerPls.removeKey(load);
+                                    innerPls.put(storePred, pls);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
     }
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Wed Mar 11 23:03:46 2009
@@ -64,6 +64,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -239,6 +240,9 @@
         la.visit();
         la.adjust();
         
+        MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan);
+        ma.visit();
+        
         return MRPlan;
     }
     
@@ -305,6 +309,11 @@
         return st;
     }
     
+    private POSplit getSplit(){
+        POSplit sp = new POSplit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+        return sp;
+    }    
+    
     /**
      * A map MROper is an MROper whose map plan is still open
      * for taking more non-blocking operators.
@@ -491,7 +500,7 @@
         MRPlan.connect(old, ret);
         return ret;
     }
-    
+ 
     /**
      * Returns a temporary DFS Path
      * @return
@@ -1655,5 +1664,258 @@
             keyType = p.getResultType();
         }
     }
-    
+
+    /** 
+     * An adjuster that merges all or part splitee MapReduceOpers into 
+     * splitter MapReduceOper. The merge can produce a MROperPlan that has 
+     * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
+     * 
+     * The MRCompler generates multiple MapReduceOpers whenever it encounters 
+     * a split operator and connects the single splitter MapReduceOper to 
+     * one or more splitee MapReduceOpers using store/load operators:  
+     *  
+     *     ---- POStore (in splitter) -... ----
+     *     |        |    ...    |
+     *     |        |    ...    |
+     *    POLoad  POLoad ...  POLoad (in splitees)
+     *      |        |           |
+     *      
+     *  This adjuster merges those MapReduceOpers by replacing POLoad/POStore 
+     *  combination with POSplit operator.    
+     */
+    private class MultiQueryAdjuster extends MROpPlanVisitor {
+ 
+        MultiQueryAdjuster(MROperPlan plan) {
+            super(plan, new DependencyOrderWalker<MapReduceOper, 
MROperPlan>(plan, false));
+        }
+
+        @Override
+        public void visitMROp(MapReduceOper mr) throws VisitorException {
+            
+            if (!isSplitter(mr)) {
+                return;
+            }
+            
+            // find all the single load map-only MROpers in the splitees
+            List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
+            List<MapReduceOper> multiLoadMappers = new 
ArrayList<MapReduceOper>();
+            List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
+                        
+            List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+            for (MapReduceOper successor : successors) {
+                if (isMapOnly(successor)) {
+                    if (isSingleLoadMapperPlan(successor.mapPlan)) { 
+                        mappers.add(successor);
+                    } else {
+                        multiLoadMappers.add(successor);
+                    }
+                } else {
+                    mapReducers.add(successor);
+                }                
+            }
+                
+            PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : 
mr.reducePlan;                            
+            PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+            List<PhysicalOperator> storePreds = 
splitterPl.getPredecessors(storeOp);             
+            
+            if (mappers.size() == 1 && mapReducers.size() == 0 && 
multiLoadMappers.size() == 0) {
+                                               
+                // In this case, just add splitee's map plan to the splitter's 
plan
+                MapReduceOper mapper = mappers.get(0);
+                
+                PhysicalPlan pl = mapper.mapPlan;
+                PhysicalOperator load = pl.getRoots().get(0);               
+                pl.remove(load);
+                                
+                // make a copy before removing the store operator
+                List<PhysicalOperator> predsCopy = new 
ArrayList<PhysicalOperator>(storePreds);
+                splitterPl.remove(storeOp);                
+                
+                // connect two plans
+                List<PhysicalOperator> roots = pl.getRoots();
+                try {
+                    splitterPl.merge(pl);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }                
+                
+                for (PhysicalOperator pred : predsCopy) {
+                    for (PhysicalOperator root : roots) {
+                        try {
+                            splitterPl.connect(pred, root);
+                        } catch (PlanException e) {
+                            throw new VisitorException(e);
+                        }
+                    }
+                }
+                
+            } else if (mappers.size() > 0) {
+                
+                // merge splitee's map plans into nested plan of 
+                // the splitter operator
+                POSplit splitOp = getSplit();
+                for (MapReduceOper mapper : mappers) {
+                                        
+                    PhysicalPlan pl = mapper.mapPlan;
+                    PhysicalOperator load = pl.getRoots().get(0);
+                    pl.remove(load);
+                    try {
+                        splitOp.addPlan(pl);
+                    } catch (PlanException e) {
+                        throw new VisitorException(e);
+                    }
+                }
+                        
+                // add original store to the split operator 
+                // if there is at least one MapReduce splitee
+                if (mapReducers.size() + multiLoadMappers.size() > 0) {
+                    PhysicalPlan storePlan = new PhysicalPlan();
+                    try {
+                        storePlan.addAsLeaf(storeOp);
+                        splitOp.addPlan(storePlan);
+                    } catch (PlanException e) {
+                        throw new VisitorException(e);
+                    }                
+                }
+            
+                // replace store operator in the splitter with split operator
+                splitOp.setInputs(storePreds);
+                try {
+                    splitterPl.replace(storeOp, splitOp);;
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }                   
+            }                       
+            
+            // remove all the map-only splitees from the MROperPlan
+            for (MapReduceOper mapper : mappers) {
+                removeAndReconnect(mapper, mr);                  
+            }
+            
+            // TO-DO: merge the other splitees if possible
+            if (mapReducers.size() + multiLoadMappers.size() > 0 ) {
+                // XXX
+            }
+        }
+        
+        /**
+         * Removes the specified MR operator from the plan after the merge. 
+         * Connects its predecessors and successors to the merged MR operator
+         * @param mr the MR operator to remove
+         * @param newMR the MR operator to be connected to the predecessors 
and 
+         *              the successors of the removed operator
+         * @throws VisitorException if connect operation fails
+         */
+        private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) 
throws VisitorException {
+            List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
+            List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
+            
+            // make a copy before removing operator
+            ArrayList<MapReduceOper> succsCopy = null;
+            ArrayList<MapReduceOper> predsCopy = null;
+            if (mapperSuccs != null) {
+                succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
+            }
+            if (mapperPreds != null) {
+                predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
+            }
+            getPlan().remove(mr);   
+            
+            // reconnect the mapper's successors
+            if (succsCopy != null) {
+                for (MapReduceOper succ : succsCopy) {
+                    try {
+                        getPlan().connect(newMR, succ);
+                    } catch (PlanException e) {
+                        throw new VisitorException(e);
+                    }
+                }
+            }
+            
+            // reconnect the mapper's predecessors
+            if (predsCopy != null) {
+                for (MapReduceOper pred : predsCopy) {
+                    if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
+                        continue;
+                    }
+                    try {
+                        getPlan().connect(pred, newMR);
+                    } catch (PlanException e) {
+                        throw new VisitorException(e);
+                    }
+                }
+            }                   
+        }
+        
+        /*
+         * Checks whether the specified MapReduce operator is a splitter 
+         */
+        private boolean isSplitter(MapReduceOper mr) {
+  
+            List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+            if (successors == null || successors.size() == 0) {
+                return false;
+            }
+                                   
+            PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
+            
+            List<PhysicalOperator> mapLeaves = pl.getLeaves();
+            if (mapLeaves == null || mapLeaves.size() != 1) {
+                return false;
+            }
+           
+            PhysicalOperator mapLeaf = mapLeaves.get(0);
+            if (!(mapLeaf instanceof POStore)) {
+                return false;
+            }
+            
+            POStore store = (POStore)mapLeaf;
+            String fileName = store.getSFile().getFileName(); 
+            
+            for (MapReduceOper mro : successors) {
+                List<PhysicalOperator> roots = mro.mapPlan.getRoots();
+                boolean splitee = false;
+                for (PhysicalOperator root : roots) {
+                    if (root instanceof POLoad) {
+                        POLoad load = (POLoad)root;
+                        if (fileName.compareTo(load.getLFile().getFileName()) 
== 0) {
+                            splitee = true;
+                            break;
+                        }
+                    }
+                }
+                if (!splitee) return false;
+            }
+            
+            return true;
+        }
+        
+        private boolean isMapOnly(MapReduceOper mr) {
+            return mr.reducePlan.isEmpty();
+        }
+        
+        private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
+            List<PhysicalOperator> roots = pl.getRoots();
+            if (roots.size() != 1) {
+                return false;
+            }
+            
+            PhysicalOperator root = roots.get(0);
+            if (!(root instanceof POLoad)) {
+                throw new IllegalStateException("Invalid root operator in 
mapper Splitee: " 
+                        +  root.getClass().getName());
+            }           
+            List<PhysicalOperator> successors = pl.getSuccessors(root);
+            if (successors == null || successors.size() != 1) {
+                throw new IllegalStateException("Root in mapper Splitee has no 
successor: "
+                        + successors.size());
+            }
+            PhysicalOperator op = successors.get(0);
+            if (!(op instanceof POFilter)) {
+                throw new IllegalStateException("Invalid successor of load in 
mapper Splitee: "
+                        + op.getClass().getName());
+            }
+            return true;
+        }
+    }    
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
 Wed Mar 11 23:03:46 2009
@@ -157,6 +157,9 @@
           else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
           }
+          else if (node instanceof POSplit) {
+              sb.append(planString(((POSplit)node).getPlans()));
+          }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 Wed Mar 11 23:03:46 2009
@@ -17,12 +17,21 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -45,11 +54,11 @@
  *     ---- POSplit -... ----
  * This is different than the existing implementation
  * where the POSplit writes to sidefiles after filtering
- * and then loads the appropirate file.
+ * and then loads the appropriate file.
  * 
  * The approach followed here is as good as the old
  * approach if not better in many cases because
- * of the availablity of attachinInputs. An optimization
+ * of the availability of attachinInputs. An optimization
  * that can ensue is if there are multiple loads that
  * load the same file, they can be merged into one and 
  * then the operators that take input from the load 
@@ -65,27 +74,61 @@
  * job whenever necessary.
  */
 public class POSplit extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;
+    
+    private Log log = LogFactory.getLog(getClass());
+    
+    /*
+     * The filespec that is used to store and load the output of the split job
+     * which is the job containing the split
+     */
+    private FileSpec splitStore;
+    
+    /*
+     * The inner physical plan
+     */
+    private PhysicalPlan myPlan = new PhysicalPlan(); 
+    
+    /*
+     * The list of sub-plans the inner plan is composed of
+     */
+    private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
+    
     /**
-     * 
+     * Constructs an operator with the specified key
+     * @param k the operator key
      */
-    private static final long serialVersionUID = 1L;
-    //The filespec that is used to store
-    //and load the output of the split job
-    //which is the job containing the split
-    FileSpec splitStore;
-
     public POSplit(OperatorKey k) {
         this(k,-1,null);
     }
 
+    /**
+     * Constructs an operator with the specified key
+     * and degree of parallelism 
+     * @param k the operator key
+     * @param rp the degree of parallelism requested
+     */
     public POSplit(OperatorKey k, int rp) {
         this(k,rp,null);
     }
 
+    /**
+     * Constructs an operator with the specified key and inputs 
+     * @param k the operator key
+     * @param inp the inputs that this operator will read data from
+     */
     public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
-        this(k,-1,null);
+        this(k,-1,inp);
     }
 
+    /**
+     * Constructs an operator with the specified key,
+     * degree of parallelism and inputs
+     * @param k the operator key
+     * @param rp the degree of parallelism requested 
+     * @param inp the inputs that this operator will read data from
+     */
     public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
     }
@@ -110,12 +153,107 @@
         return true;
     }
 
+    /**
+     * Returns the name of the file associated with this operator
+     * @return the FileSpec associated with this operator
+     */
     public FileSpec getSplitStore() {
         return splitStore;
     }
 
+    /**
+     * Sets the name of the file associated with this operator
+     * @param splitStore the FileSpec used to store the data
+     */
     public void setSplitStore(FileSpec splitStore) {
         this.splitStore = splitStore;
     }
 
+    /**
+     * Returns the inner physical plan associated with this operator
+     * @return the inner plan
+     */
+    public PhysicalPlan getPlan() {
+        return myPlan;
+    }
+
+    /**
+     * Returns the list of nested plans. This is used by 
+     * explain method to display the inner plans.
+     * @return the list of the nested plans
+     * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
+     */
+    public List<PhysicalPlan> getPlans() {
+        return myPlans;
+    }
+    
+    /**
+     * Appends the specified plan to the end of 
+     * the nested input plan list
+     * @param plan plan to be appended to the list
+     */
+    public void addPlan(PhysicalPlan inPlan) throws PlanException {        
+        myPlan.merge(inPlan);
+        myPlans.add(inPlan);
+    }
+   
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        
+        Result inp = processInput();
+            
+        if (inp.returnStatus == POStatus.STATUS_EOP) {
+            return inp;
+        }
+         
+        // process the nested plans
+        myPlan.attachInput((Tuple)inp.result);           
+        processPlan();
+            
+        return new Result(POStatus.STATUS_NULL, null);                        
+    }
+
+    private void processPlan() throws ExecException {
+   
+        List<PhysicalOperator> leaves = myPlan.getLeaves();
+        
+        for (PhysicalOperator leaf : leaves) {
+            
+            // TO-DO: other types of leaves are possible later
+            if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) {
+                throw new ExecException("Invalid operator type in the split 
plan: "
+                        + leaf.getOperatorKey());
+            }            
+                   
+            runPipeline(leaf);     
+        }
+    }
+    
+    private void runPipeline(PhysicalOperator leaf) throws ExecException {
+       
+        while (true) {
+            
+            Result res = leaf.getNext(dummyTuple);
+            
+            if (res.returnStatus == POStatus.STATUS_OK ||
+                    res.returnStatus == POStatus.STATUS_NULL) {                
+                continue;
+            }                 
+            
+            if (res.returnStatus == POStatus.STATUS_EOP) {
+                break;
+            }
+            
+            if (res.returnStatus == POStatus.STATUS_ERR) {
+
+                // if there is an err message use it
+                String errMsg = (res.result != null) ?
+                        "Received Error while processing the split plan: " + 
res.result :
+                        "Received Error while processing the split plan.";   
+                    
+                throw new ExecException(errMsg);
+            }
+        }        
+    }
+        
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 Wed Mar 11 23:03:46 2009
@@ -52,7 +52,12 @@
             if (leaf instanceof POStore) {
                 stores.add((POStore)leaf);
             }
-        }
+            if (leaf instanceof POSplit) {
+                PhysicalPlan pl = ((POSplit)leaf).getPlan();
+                List<POStore> nestedStores = getStores(pl);
+                stores.addAll(nestedStores);
+            }
+       }
         return stores;
     }
 
@@ -88,4 +93,4 @@
             return new Path("rel/"+pathStr).toString();
         }
     }
-}
\ No newline at end of file
+}

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java 
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java 
Wed Mar 11 23:03:46 2009
@@ -212,7 +212,18 @@
      */
     public Schema determineSchema(String fileName, ExecType execType,
             DataStorage storage) throws IOException {
-        InputStream is = FileLocalizer.open(fileName, execType, storage);
+
+        InputStream is = null;
+
+        try {
+            is = FileLocalizer.open(fileName, execType, storage);
+        } catch (IOException e) {
+            // At compile time in batch mode, the file may not exist
+            // (such as intermediate file). Just return null - the
+            // same way as we could's get a valid record from the input.
+            return null;
+        }
+        
         bindTo(fileName, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
         // get the first record from the input file
         // and figure out the schema from the data in

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 Wed Mar 11 23:03:46 2009
@@ -28,21 +28,34 @@
  * DependencyOrderWalker traverses the graph in such a way that no node is 
visited
  * before all the nodes it depends on have been visited.  Beyond this, it does 
not
  * guarantee any particular order.  So, you have a graph with node 1 2 3 4, and
- * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be 
visited
+ * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be 
visited
  * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
  * visited first.
  */
 public class DependencyOrderWalker <O extends Operator, P extends 
OperatorPlan<O>>
     extends PlanWalker<O, P> {
 
+    private boolean rootsFirst = true;
+    
     /**
      * @param plan Plan for this walker to traverse.
      */
     public DependencyOrderWalker(P plan) {
-        super(plan);
+        this(plan, true);
     }
 
     /**
+     * Constructs a walker with the specified plan and walk direction
+     * @param plan plan for this walker to traverse
+     * @param rootsFirst flag that indicates walking up (from roots 
+     *      to leaves) or walking down (from leaves to roots)
+     */   
+    public DependencyOrderWalker(P plan, boolean rootsFirst) {
+        super(plan);
+        this.rootsFirst = rootsFirst;
+    }
+    
+    /**
      * Begin traversing the graph.
      * @param visitor Visitor this walker is being used by.
      * @throws VisitorException if an error is encountered while walking.
@@ -58,12 +71,14 @@
 
         List<O> fifo = new ArrayList<O>();
         Set<O> seen = new HashSet<O>();
-        List<O> leaves = mPlan.getLeaves();
-        if (leaves == null) return;
-        for (O op : leaves) {
-            doAllPredecessors(op, seen, fifo);
+        List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots();
+       
+        if (nodes == null) return;
+            
+        for (O op : nodes) {
+            doAllDependencies(op, seen, fifo);
         }
-
+        
         for (O op: fifo) {
             op.visit(visitor);
         }
@@ -73,9 +88,28 @@
         return new DependencyOrderWalker<O, P>(plan);
     }
 
+    protected void doAllDependencies(O node,
+                                     Set<O> seen,
+                                     Collection<O> fifo) throws 
VisitorException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<O> nodes = rootsFirst ? 
+                    mPlan.getPredecessors(node) : mPlan.getSuccessors(node);
+            if (nodes != null) {
+                // Do all our predecessors before ourself
+                for (O op : nodes) {
+                    doAllDependencies(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+
     protected void doAllPredecessors(O node,
-                                   Set<O> seen,
-                                   Collection<O> fifo) throws VisitorException 
{
+                                     Set<O> seen,
+                                     Collection<O> fifo) throws 
VisitorException {
         if (!seen.contains(node)) {
             // We haven't seen this one before.
             Collection<O> preds = mPlan.getPredecessors(node);
@@ -86,8 +120,9 @@
                 }
             }
             // Now do ourself
-            seen.add(node);
+            seen.add(node);             
             fifo.add(node);
         }
     }
+
 }


Reply via email to