Author: pradeepkth
Date: Mon Mar 23 23:36:10 2009
New Revision: 757598

URL: http://svn.apache.org/viewvc?rev=757598&view=rev
Log:
 PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)

Added:
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Mon Mar 23 23:36:10 2009
@@ -414,3 +414,5 @@
     PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
 
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+    PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Mar 23 23:36:10 2009
@@ -64,7 +64,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -240,9 +239,6 @@
         la.visit();
         la.adjust();
         
-        MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan);
-        ma.visit();
-        
         return MRPlan;
     }
     
@@ -309,11 +305,6 @@
         return st;
     }
     
-    private POSplit getSplit(){
-        POSplit sp = new POSplit(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-        return sp;
-    }    
-    
     /**
      * A map MROper is an MROper whose map plan is still open
      * for taking more non-blocking operators.
@@ -611,6 +602,7 @@
         try{
             FileSpec fSpec = op.getSplitStore();
             MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+            mro.setSplitter(true);
             splitsSeen.put(op.getOperatorKey(), mro);
             curMROp = startNew(fSpec, mro);
         }catch(Exception e){
@@ -1665,257 +1657,4 @@
         }
     }
 
-    /** 
-     * An adjuster that merges all or part splitee MapReduceOpers into 
-     * splitter MapReduceOper. The merge can produce a MROperPlan that has 
-     * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
-     * 
-     * The MRCompler generates multiple MapReduceOpers whenever it encounters 
-     * a split operator and connects the single splitter MapReduceOper to 
-     * one or more splitee MapReduceOpers using store/load operators:  
-     *  
-     *     ---- POStore (in splitter) -... ----
-     *     |        |    ...    |
-     *     |        |    ...    |
-     *    POLoad  POLoad ...  POLoad (in splitees)
-     *      |        |           |
-     *      
-     *  This adjuster merges those MapReduceOpers by replacing POLoad/POStore 
-     *  combination with POSplit operator.    
-     */
-    private class MultiQueryAdjuster extends MROpPlanVisitor {
- 
-        MultiQueryAdjuster(MROperPlan plan) {
-            super(plan, new DependencyOrderWalker<MapReduceOper, 
MROperPlan>(plan, false));
-        }
-
-        @Override
-        public void visitMROp(MapReduceOper mr) throws VisitorException {
-            
-            if (!isSplitter(mr)) {
-                return;
-            }
-            
-            // find all the single load map-only MROpers in the splitees
-            List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
-            List<MapReduceOper> multiLoadMappers = new 
ArrayList<MapReduceOper>();
-            List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
-                        
-            List<MapReduceOper> successors = getPlan().getSuccessors(mr);
-            for (MapReduceOper successor : successors) {
-                if (isMapOnly(successor)) {
-                    if (isSingleLoadMapperPlan(successor.mapPlan)) { 
-                        mappers.add(successor);
-                    } else {
-                        multiLoadMappers.add(successor);
-                    }
-                } else {
-                    mapReducers.add(successor);
-                }                
-            }
-                
-            PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : 
mr.reducePlan;                            
-            PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
-            List<PhysicalOperator> storePreds = 
splitterPl.getPredecessors(storeOp);             
-            
-            if (mappers.size() == 1 && mapReducers.size() == 0 && 
multiLoadMappers.size() == 0) {
-                                               
-                // In this case, just add splitee's map plan to the splitter's 
plan
-                MapReduceOper mapper = mappers.get(0);
-                
-                PhysicalPlan pl = mapper.mapPlan;
-                PhysicalOperator load = pl.getRoots().get(0);               
-                pl.remove(load);
-                                
-                // make a copy before removing the store operator
-                List<PhysicalOperator> predsCopy = new 
ArrayList<PhysicalOperator>(storePreds);
-                splitterPl.remove(storeOp);                
-                
-                // connect two plans
-                List<PhysicalOperator> roots = pl.getRoots();
-                try {
-                    splitterPl.merge(pl);
-                } catch (PlanException e) {
-                    throw new VisitorException(e);
-                }                
-                
-                for (PhysicalOperator pred : predsCopy) {
-                    for (PhysicalOperator root : roots) {
-                        try {
-                            splitterPl.connect(pred, root);
-                        } catch (PlanException e) {
-                            throw new VisitorException(e);
-                        }
-                    }
-                }
-                
-            } else if (mappers.size() > 0) {
-                
-                // merge splitee's map plans into nested plan of 
-                // the splitter operator
-                POSplit splitOp = getSplit();
-                for (MapReduceOper mapper : mappers) {
-                                        
-                    PhysicalPlan pl = mapper.mapPlan;
-                    PhysicalOperator load = pl.getRoots().get(0);
-                    pl.remove(load);
-                    try {
-                        splitOp.addPlan(pl);
-                    } catch (PlanException e) {
-                        throw new VisitorException(e);
-                    }
-                }
-                        
-                // add original store to the split operator 
-                // if there is at least one MapReduce splitee
-                if (mapReducers.size() + multiLoadMappers.size() > 0) {
-                    PhysicalPlan storePlan = new PhysicalPlan();
-                    try {
-                        storePlan.addAsLeaf(storeOp);
-                        splitOp.addPlan(storePlan);
-                    } catch (PlanException e) {
-                        throw new VisitorException(e);
-                    }                
-                }
-            
-                // replace store operator in the splitter with split operator
-                splitOp.setInputs(storePreds);
-                try {
-                    splitterPl.replace(storeOp, splitOp);;
-                } catch (PlanException e) {
-                    throw new VisitorException(e);
-                }                   
-            }                       
-            
-            // remove all the map-only splitees from the MROperPlan
-            for (MapReduceOper mapper : mappers) {
-                removeAndReconnect(mapper, mr);                  
-            }
-            
-            // TO-DO: merge the other splitees if possible
-            if (mapReducers.size() + multiLoadMappers.size() > 0 ) {
-                // XXX
-            }
-        }
-        
-        /**
-         * Removes the specified MR operator from the plan after the merge. 
-         * Connects its predecessors and successors to the merged MR operator
-         * @param mr the MR operator to remove
-         * @param newMR the MR operator to be connected to the predecessors 
and 
-         *              the successors of the removed operator
-         * @throws VisitorException if connect operation fails
-         */
-        private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) 
throws VisitorException {
-            List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
-            List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
-            
-            // make a copy before removing operator
-            ArrayList<MapReduceOper> succsCopy = null;
-            ArrayList<MapReduceOper> predsCopy = null;
-            if (mapperSuccs != null) {
-                succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
-            }
-            if (mapperPreds != null) {
-                predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
-            }
-            getPlan().remove(mr);   
-            
-            // reconnect the mapper's successors
-            if (succsCopy != null) {
-                for (MapReduceOper succ : succsCopy) {
-                    try {
-                        getPlan().connect(newMR, succ);
-                    } catch (PlanException e) {
-                        throw new VisitorException(e);
-                    }
-                }
-            }
-            
-            // reconnect the mapper's predecessors
-            if (predsCopy != null) {
-                for (MapReduceOper pred : predsCopy) {
-                    if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
-                        continue;
-                    }
-                    try {
-                        getPlan().connect(pred, newMR);
-                    } catch (PlanException e) {
-                        throw new VisitorException(e);
-                    }
-                }
-            }                   
-        }
-        
-        /*
-         * Checks whether the specified MapReduce operator is a splitter 
-         */
-        private boolean isSplitter(MapReduceOper mr) {
-  
-            List<MapReduceOper> successors = getPlan().getSuccessors(mr);
-            if (successors == null || successors.size() == 0) {
-                return false;
-            }
-                                   
-            PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
-            
-            List<PhysicalOperator> mapLeaves = pl.getLeaves();
-            if (mapLeaves == null || mapLeaves.size() != 1) {
-                return false;
-            }
-           
-            PhysicalOperator mapLeaf = mapLeaves.get(0);
-            if (!(mapLeaf instanceof POStore)) {
-                return false;
-            }
-            
-            POStore store = (POStore)mapLeaf;
-            String fileName = store.getSFile().getFileName(); 
-            
-            for (MapReduceOper mro : successors) {
-                List<PhysicalOperator> roots = mro.mapPlan.getRoots();
-                boolean splitee = false;
-                for (PhysicalOperator root : roots) {
-                    if (root instanceof POLoad) {
-                        POLoad load = (POLoad)root;
-                        if (fileName.compareTo(load.getLFile().getFileName()) 
== 0) {
-                            splitee = true;
-                            break;
-                        }
-                    }
-                }
-                if (!splitee) return false;
-            }
-            
-            return true;
-        }
-        
-        private boolean isMapOnly(MapReduceOper mr) {
-            return mr.reducePlan.isEmpty();
-        }
-        
-        private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
-            List<PhysicalOperator> roots = pl.getRoots();
-            if (roots.size() != 1) {
-                return false;
-            }
-            
-            PhysicalOperator root = roots.get(0);
-            if (!(root instanceof POLoad)) {
-                throw new IllegalStateException("Invalid root operator in 
mapper Splitee: " 
-                        +  root.getClass().getName());
-            }           
-            List<PhysicalOperator> successors = pl.getSuccessors(root);
-            if (successors == null || successors.size() != 1) {
-                throw new IllegalStateException("Root in mapper Splitee has no 
successor: "
-                        + successors.size());
-            }
-            PhysicalOperator op = successors.get(0);
-            if (!(op instanceof POFilter)) {
-                throw new IllegalStateException("Invalid successor of load in 
mapper Splitee: "
-                        + op.getClass().getName());
-            }
-            return true;
-        }
-    }    
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Mon Mar 23 23:36:10 2009
@@ -183,6 +183,10 @@
         // an appropriate NullableXXXWritable object
         KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
         kdv.visit();
+        
+        MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+        mqOptimizer.visit();
+        
         return plan;
     }
  

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Mon Mar 23 23:36:10 2009
@@ -109,6 +109,10 @@
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
 
+    // Indicates that this MROper is a splitter MROper. 
+    // That is, this MROper ends due to a POSPlit operator.
+    private boolean splitter = false;
+
     public MapReduceOper(OperatorKey k) {
         super(k);
         mapPlan = new PhysicalPlan();
@@ -327,4 +331,12 @@
     public int getRequestedParallelism() {
         return requestedParallelism;
     }
+
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
 }

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=757598&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Mon Mar 23 23:36:10 2009
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/** 
+ * An optimizer that merges all or part splittee MapReduceOpers into 
+ * splitter MapReduceOper. The merge can produce a MROperPlan that has 
+ * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
+ * 
+ * The MRCompler generates multiple MapReduceOpers whenever it encounters 
+ * a split operator and connects the single splitter MapReduceOper to 
+ * one or more splittee MapReduceOpers using store/load operators:  
+ *  
+ *     ---- POStore (in splitter) -... ----
+ *     |        |    ...    |
+ *     |        |    ...    |
+ *    POLoad  POLoad ...  POLoad (in splittees)
+ *      |        |           |
+ *      
+ *  This optimizer merges those MapReduceOpers by replacing POLoad/POStore 
+ *  combination with POSplit operator.    
+ */
+class MultiQueryOptimizer extends MROpPlanVisitor {
+    
+    private Log log = LogFactory.getLog(getClass());
+    
+    private NodeIdGenerator nig;
+    
+    private String scope;
+    
+    MultiQueryOptimizer(MROperPlan plan) {
+        super(plan, new ReverseDependencyOrderWalker<MapReduceOper, 
MROperPlan>(plan));
+        nig = NodeIdGenerator.getGenerator();
+        List<MapReduceOper> roots = plan.getRoots();
+        scope = roots.get(0).getOperatorKey().getScope();
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+       
+        if (!mr.isSplitter()) {
+            return;
+        }
+       
+        // first classify all the splittees
+        List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
+        List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>();
+        List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
+                    
+        List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+        for (MapReduceOper successor : successors) {
+            if (isMapOnly(successor)) {
+                if (isSingleLoadMapperPlan(successor.mapPlan)) {               
     
+                    mappers.add(successor);
+                } else {                    
+                    multiLoadMROpers.add(successor);
+                }
+            } else {
+                if (isSingleLoadMapperPlan(successor.mapPlan)) {               
      
+                    mapReducers.add(successor);
+                } else {                    
+                    multiLoadMROpers.add(successor);
+                }
+            }                
+        }
+              
+        // case 1: exactly one splittee and it's map-only
+        if (mappers.size() == 1 && mapReducers.size() == 0 
+                && multiLoadMROpers.size() == 0 ) {            
+            mergeOnlyMapperSplittee(mappers.get(0), mr);
+            return;              
+        }
+        
+        // case 2: exactly one splittee and it has reducer
+        if (isMapOnly(mr) && mapReducers.size() == 1 
+                && mappers.size() == 0 && multiLoadMROpers.size() == 0) {      
      
+            mergeOnlyMapReduceSplittee(mapReducers.get(0), mr);
+            return;
+        } 
+        
+        PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;  
                          
+        POStore storeOp = (POStore)splitterPl.getLeaves().get(0); 
+        
+        POSplit splitOp = null;
+        
+        // case 3: multiple splittees and at least one of them is map-only
+        if (mappers.size() > 0) {                
+            splitOp = getSplit();                
+            mergeAllMapOnlySplittees(mappers, mr, splitOp);
+        }            
+               
+        boolean splitterMapOnly = isMapOnly(mr);
+        
+        // case 4: multiple splittees and at least one of them has reducer  
+        if (splitterMapOnly && mapReducers.size() > 0) {
+           
+            // pick one to merge, prefer one that has a combiner
+            MapReduceOper mapReducer= mapReducers.get(0);
+            for (MapReduceOper mro : mapReducers) {
+                if (!mro.combinePlan.isEmpty()) {
+                    mapReducer = mro;
+                    break;
+                }
+            }
+              
+            PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+                                                            
+            splitOp = (leaf instanceof POStore) ? 
+                    getSplit() : (POSplit)leaf;
+                    
+            mergeSingleMapReduceSplittee(mapReducer, mr, splitOp);             
   
+        }
+        
+        // finally, add original store to the split operator 
+        // if there is splittee that hasn't been merged
+        if (splitOp != null 
+                && ((multiLoadMROpers.size() > 0)
+                        || (mapReducers.size() > 1) 
+                        || (!splitterMapOnly && mapReducers.size() > 0))) {
+
+            PhysicalPlan storePlan = new PhysicalPlan();
+            try {
+                storePlan.addAsLeaf(storeOp);
+                splitOp.addPlan(storePlan);
+            } catch (PlanException e) {
+                throw new VisitorException(e);
+            }                               
+        }
+    }                
+   
+    private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
+    throws VisitorException {
+        PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan : 
splitter.reducePlan;
+        POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
+        List<PhysicalOperator> storePreds = 
splitterPl.getPredecessors(storeOp);           
+        
+        PhysicalPlan pl = mapper.mapPlan;
+        PhysicalOperator load = pl.getRoots().get(0);               
+        pl.remove(load);
+                        
+        // make a copy before removing the store operator
+        List<PhysicalOperator> predsCopy = new 
ArrayList<PhysicalOperator>(storePreds);
+        splitterPl.remove(storeOp);                
+        
+        try {
+            splitterPl.merge(pl);
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }                
+        
+        // connect two plans   
+        List<PhysicalOperator> roots = pl.getRoots();
+        for (PhysicalOperator pred : predsCopy) {
+            for (PhysicalOperator root : roots) {
+                try {
+                    splitterPl.connect(pred, root);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+            }
+        }
+    }
+
+    private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper 
splitter) 
+    throws VisitorException {
+        mergeOneMapPart(mapper, splitter);       
+        removeAndReconnect(mapper, splitter);
+    }
+    
+    private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, 
MapReduceOper splitter)
+    throws VisitorException {
+        mergeOneMapPart(mapReducer, splitter);
+
+        splitter.setMapDone(true);
+        splitter.reducePlan = mapReducer.reducePlan;
+        splitter.setReduceDone(true);
+
+        removeAndReconnect(mapReducer, splitter);          
+    }
+    
+    private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+        
+        PhysicalPlan splitterPl = isMapOnly(splitter) ? 
+                splitter.mapPlan : splitter.reducePlan;                        
    
+        PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+        List<PhysicalOperator> storePreds = 
splitterPl.getPredecessors(storeOp);  
+
+        // merge splitee's map plans into nested plan of 
+        // the splitter operator
+        for (MapReduceOper mapper : mappers) {                                
+            PhysicalPlan pl = mapper.mapPlan;
+            PhysicalOperator load = pl.getRoots().get(0);
+            pl.remove(load);
+            try {
+                splitOp.addPlan(pl);
+            } catch (PlanException e) {
+                throw new VisitorException(e);
+            }
+        }
+                           
+        // replace store operator in the splitter with split operator
+        splitOp.setInputs(storePreds);
+        try {
+            splitterPl.replace(storeOp, splitOp);;
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }    
+        
+        // remove all the map-only splittees from the MROperPlan
+        for (MapReduceOper mapper : mappers) {
+            removeAndReconnect(mapper, splitter);                  
+        }
+    }
+    
+    private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+        
+        PhysicalPlan splitterPl = splitter.mapPlan;
+        PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+        PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+        List<PhysicalOperator> storePreds = 
splitterPl.getPredecessors(storeOp);  
+                        
+        PhysicalPlan pl = mapReduce.mapPlan;
+        PhysicalOperator load = pl.getRoots().get(0);
+        pl.remove(load);
+        try {
+            splitOp.addPlan(pl);
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+                              
+        splitter.setMapDone(true);
+        splitter.reducePlan = mapReduce.reducePlan;
+        splitter.setReduceDone(true);
+        splitter.combinePlan = mapReduce.combinePlan;
+                
+        // replace store operator in the splitter with split operator
+        if (leaf instanceof POStore) {                            
+            splitOp.setInputs(storePreds);
+            try {
+                splitterPl.replace(storeOp, splitOp);;
+            } catch (PlanException e) {
+                throw new VisitorException(e);
+            }  
+        }
+        
+        removeAndReconnect(mapReduce, splitter);           
+    }
+    
+    /**
+     * Removes the specified MR operator from the plan after the merge. 
+     * Connects its predecessors and successors to the merged MR operator
+     * @param mr the MR operator to remove
+     * @param newMR the MR operator to be connected to the predecessors and 
+     *              the successors of the removed operator
+     * @throws VisitorException if connect operation fails
+     */
+    private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) 
throws VisitorException {
+        List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
+        List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
+        
+        // make a copy before removing operator
+        ArrayList<MapReduceOper> succsCopy = null;
+        ArrayList<MapReduceOper> predsCopy = null;
+        if (mapperSuccs != null) {
+            succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
+        }
+        if (mapperPreds != null) {
+            predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
+        }
+        getPlan().remove(mr);   
+        
+        // reconnect the mapper's successors
+        if (succsCopy != null) {
+            for (MapReduceOper succ : succsCopy) {
+                try {                   
+                    getPlan().connect(newMR, succ);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+            }
+        }
+        
+        // reconnect the mapper's predecessors
+        if (predsCopy != null) {
+            for (MapReduceOper pred : predsCopy) {
+                if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
+                    continue;
+                }
+                try {                    
+                    getPlan().connect(pred, newMR);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+            }
+        }     
+        
+        mergeMROperProperties(mr, newMR);
+    }
+    
+    private void mergeMROperProperties(MapReduceOper from, MapReduceOper to) {
+
+        if (from.isStreamInMap()) {
+            to.setStreamInMap(true);
+        }
+
+        if (from.isStreamInReduce()) {
+            to.setStreamInReduce(true);
+        }
+        
+        if (from.getRequestedParallelism() > to.getRequestedParallelism()) {
+            to.requestedParallelism = from.requestedParallelism;
+        }
+
+        if (!from.UDFs.isEmpty()) {
+            to.UDFs.addAll(from.UDFs);
+        }
+
+        if (from.needsDistinctCombiner()) {
+            to.setNeedsDistinctCombiner(true);
+        }
+
+        if (to.mapKeyType == DataType.UNKNOWN) {
+            to.mapKeyType = from.mapKeyType;
+        }
+    }
+
+    private boolean isMapOnly(MapReduceOper mr) {
+        return mr.reducePlan.isEmpty();
+    }
+    
+    private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
+        List<PhysicalOperator> roots = pl.getRoots();
+        return (roots.size() == 1);
+    }
+    
+    private POSplit getSplit(){
+        POSplit sp = new POSplit(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        return sp;
+    } 
+}

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 Mon Mar 23 23:36:10 2009
@@ -18,6 +18,7 @@
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -84,17 +85,16 @@
      * which is the job containing the split
      */
     private FileSpec splitStore;
-    
-    /*
-     * The inner physical plan
-     */
-    private PhysicalPlan myPlan = new PhysicalPlan(); 
-    
+       
     /*
      * The list of sub-plans the inner plan is composed of
      */
     private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
     
+    private BitSet processedSet = new BitSet();
+    
+    private static Result empty = new Result(POStatus.STATUS_NULL, null);
+    
     /**
      * Constructs an operator with the specified key
      * @param k the operator key
@@ -170,16 +170,7 @@
     }
 
     /**
-     * Returns the inner physical plan associated with this operator
-     * @return the inner plan
-     */
-    public PhysicalPlan getPlan() {
-        return myPlan;
-    }
-
-    /**
-     * Returns the list of nested plans. This is used by 
-     * explain method to display the inner plans.
+     * Returns the list of nested plans. 
      * @return the list of the nested plans
      * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
      */
@@ -193,67 +184,69 @@
      * @param plan plan to be appended to the list
      */
     public void addPlan(PhysicalPlan inPlan) throws PlanException {        
-        myPlan.merge(inPlan);
         myPlans.add(inPlan);
+        processedSet.set(myPlans.size()-1);
     }
    
     @Override
     public Result getNext(Tuple t) throws ExecException {
         
-        Result inp = processInput();
+        if (processedSet.cardinality() == myPlans.size()) {
             
-        if (inp.returnStatus == POStatus.STATUS_EOP) {
-            return inp;
-        }
+            Result inp = processInput();
+            
+            if (inp.returnStatus == POStatus.STATUS_EOP) {
+                return inp;
+            }
          
-        // process the nested plans
-        myPlan.attachInput((Tuple)inp.result);           
-        processPlan();
+            Tuple tuple = (Tuple)inp.result;
+            for (PhysicalPlan pl : myPlans) {
+                pl.attachInput(tuple);
+            }
             
-        return new Result(POStatus.STATUS_NULL, null);                        
+            processedSet.clear();
+        }
+        
+        return processPlan();                                       
     }
 
-    private void processPlan() throws ExecException {
+    private Result processPlan() throws ExecException {
    
-        List<PhysicalOperator> leaves = myPlan.getLeaves();
+        int idx = processedSet.nextClearBit(0);
+        PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
         
-        for (PhysicalOperator leaf : leaves) {
-            
-            // TO-DO: other types of leaves are possible later
-            if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) {
-                throw new ExecException("Invalid operator type in the split 
plan: "
-                        + leaf.getOperatorKey());
-            }            
-                   
-            runPipeline(leaf);     
+        Result res = runPipeline(leaf);
+        
+        if (res.returnStatus == POStatus.STATUS_EOP) {
+            processedSet.set(idx++);        
+            if (idx < myPlans.size()) {
+                res = processPlan();
+            }
         }
+        
+        return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
     }
     
-    private void runPipeline(PhysicalOperator leaf) throws ExecException {
+    private Result runPipeline(PhysicalOperator leaf) throws ExecException {
        
+        Result res = null;
+        
         while (true) {
             
-            Result res = leaf.getNext(dummyTuple);
+            res = leaf.getNext(dummyTuple);
             
-            if (res.returnStatus == POStatus.STATUS_OK ||
-                    res.returnStatus == POStatus.STATUS_NULL) {                
+            if (res.returnStatus == POStatus.STATUS_OK) {                
+                break;
+            } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
-            }                 
-            
-            if (res.returnStatus == POStatus.STATUS_EOP) {
+            } else if (res.returnStatus == POStatus.STATUS_EOP) {
+                break;
+            } else if (res.returnStatus == POStatus.STATUS_ERR) {
                 break;
             }
-            
-            if (res.returnStatus == POStatus.STATUS_ERR) {
-
-                // if there is an err message use it
-                String errMsg = (res.result != null) ?
-                        "Received Error while processing the split plan: " + 
res.result :
-                        "Received Error while processing the split plan.";   
-                    
-                throw new ExecException(errMsg);
-            }
-        }        
+        }   
+        
+        return res;
     }
         
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
 Mon Mar 23 23:36:10 2009
@@ -53,11 +53,13 @@
                 stores.add((POStore)leaf);
             }
             if (leaf instanceof POSplit) {
-                PhysicalPlan pl = ((POSplit)leaf).getPlan();
-                List<POStore> nestedStores = getStores(pl);
-                stores.addAll(nestedStores);
+                List<PhysicalPlan> pls = ((POSplit)leaf).getPlans();
+                for (PhysicalPlan pl : pls) {
+                    List<POStore> nestedStores = getStores(pl);
+                    stores.addAll(nestedStores);
+                }
             }
-       }
+        }
         return stores;
     }
 

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java 
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java 
Mon Mar 23 23:36:10 2009
@@ -32,6 +32,7 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataReaderWriter;
@@ -213,17 +214,15 @@
     public Schema determineSchema(String fileName, ExecType execType,
             DataStorage storage) throws IOException {
 
-        InputStream is = null;
-
-        try {
-            is = FileLocalizer.open(fileName, execType, storage);
-        } catch (IOException e) {
+        if (!FileLocalizer.fileExists(fileName, storage)) {
             // At compile time in batch mode, the file may not exist
             // (such as intermediate file). Just return null - the
-            // same way as we could's get a valid record from the input.
+            // same way as we would if we did not get a valid record
             return null;
         }
         
+        InputStream is = FileLocalizer.open(fileName, execType, storage);
+       
         bindTo(fileName, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
         // get the first record from the input file
         // and figure out the schema from the data in

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 Mon Mar 23 23:36:10 2009
@@ -28,33 +28,20 @@
  * DependencyOrderWalker traverses the graph in such a way that no node is 
visited
  * before all the nodes it depends on have been visited.  Beyond this, it does 
not
  * guarantee any particular order.  So, you have a graph with node 1 2 3 4, and
- * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be 
visited
+ * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be 
visited
  * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
  * visited first.
  */
 public class DependencyOrderWalker <O extends Operator, P extends 
OperatorPlan<O>>
     extends PlanWalker<O, P> {
 
-    private boolean rootsFirst = true;
-    
     /**
      * @param plan Plan for this walker to traverse.
      */
     public DependencyOrderWalker(P plan) {
-        this(plan, true);
-    }
-
-    /**
-     * Constructs a walker with the specified plan and walk direction
-     * @param plan plan for this walker to traverse
-     * @param rootsFirst flag that indicates walking up (from roots 
-     *      to leaves) or walking down (from leaves to roots)
-     */   
-    public DependencyOrderWalker(P plan, boolean rootsFirst) {
         super(plan);
-        this.rootsFirst = rootsFirst;
     }
-    
+
     /**
      * Begin traversing the graph.
      * @param visitor Visitor this walker is being used by.
@@ -71,14 +58,12 @@
 
         List<O> fifo = new ArrayList<O>();
         Set<O> seen = new HashSet<O>();
-        List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots();
-       
-        if (nodes == null) return;
-            
-        for (O op : nodes) {
-            doAllDependencies(op, seen, fifo);
+        List<O> leaves = mPlan.getLeaves();
+        if (leaves == null) return;
+        for (O op : leaves) {
+            doAllPredecessors(op, seen, fifo);
         }
-        
+
         for (O op: fifo) {
             op.visit(visitor);
         }
@@ -88,28 +73,9 @@
         return new DependencyOrderWalker<O, P>(plan);
     }
 
-    protected void doAllDependencies(O node,
-                                     Set<O> seen,
-                                     Collection<O> fifo) throws 
VisitorException {
-        if (!seen.contains(node)) {
-            // We haven't seen this one before.
-            Collection<O> nodes = rootsFirst ? 
-                    mPlan.getPredecessors(node) : mPlan.getSuccessors(node);
-            if (nodes != null) {
-                // Do all our predecessors before ourself
-                for (O op : nodes) {
-                    doAllDependencies(op, seen, fifo);
-                }
-            }
-            // Now do ourself
-            seen.add(node);
-            fifo.add(node);
-        }
-    }
-
     protected void doAllPredecessors(O node,
-                                     Set<O> seen,
-                                     Collection<O> fifo) throws 
VisitorException {
+                                   Set<O> seen,
+                                   Collection<O> fifo) throws VisitorException 
{
         if (!seen.contains(node)) {
             // We haven't seen this one before.
             Collection<O> preds = mPlan.getPredecessors(node);
@@ -120,9 +86,8 @@
                 }
             }
             // Now do ourself
-            seen.add(node);             
+            seen.add(node);
             fifo.add(node);
         }
     }
-
 }

Added: 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java?rev=757598&view=auto
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
 (added)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
 Mon Mar 23 23:36:10 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * ReverseDependencyOrderWalker traverses the graph in such a way that no node 
is visited
+ * before all the nodes that are its successors on have been visited.  Beyond 
this, it does not
+ * guarantee any particular order.  So, you have a graph with node 1 2 3 4, and
+ * edges 1->3, 2->3, and 3->4, this walker guarantees that 4 will be visited
+ * before 3 and 3 before 1 and 2, but it does not guarantee whether 1 or 2 
will be
+ * visited first.
+ */
+public class ReverseDependencyOrderWalker <O extends Operator, P extends 
OperatorPlan<O>>
+    extends PlanWalker<O, P> {
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public ReverseDependencyOrderWalker(P plan) {
+        super(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        // This is highly inefficient, but our graphs are small so it should 
be okay.
+        // The algorithm works by starting at any node in the graph, finding 
it's
+        // successors and calling itself for each of those successors.  When it
+        // finds a node that has no unfinished successors it puts that node in 
the
+        // list.  It then unwinds itself putting each of the other nodes in 
the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put 
any
+        // nodes in the graph twice.
+
+        List<O> fifo = new ArrayList<O>();
+        Set<O> seen = new HashSet<O>();
+        List<O> roots = mPlan.getRoots();
+        if (roots == null) return;
+        for (O op : roots) {
+            doAllSuccessors(op, seen, fifo);
+        }
+
+        for (O op: fifo) {
+            op.visit(visitor);
+        }
+    }
+
+    public PlanWalker<O, P> spawnChildWalker(P plan) { 
+        return new ReverseDependencyOrderWalker<O, P>(plan);
+    }
+
+    protected void doAllSuccessors(O node,
+                                   Set<O> seen,
+                                   Collection<O> fifo) throws VisitorException 
{
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<O> succs = mPlan.getSuccessors(node);
+            if (succs != null && succs.size() > 0) {
+                // Do all our successors before ourself
+                for (O op : succs) {
+                    doAllSuccessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}


Reply via email to