Author: daijy
Date: Wed Nov 18 19:01:21 2009
New Revision: 881879

URL: http://svn.apache.org/viewvc?rev=881879&view=rev
Log:
PIG-1060: MultiQuery optimization throws error for multi-level splits

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Nov 18 19:01:21 2009
@@ -184,7 +184,9 @@
 
 PIG-1001: Generate more meaningful error message when one input file does not 
exist (daijy)
 
-Release 0.5.0 - Unreleased
+PIG-1060: MultiQuery optimization throws error for multi-level splits (rding 
via daijy)
+
+Release 0.5.0
 
 INCOMPATIBLE CHANGES
 

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Wed Nov 18 19:01:21 2009
@@ -407,11 +407,16 @@
         NoopFilterRemover fRem = new NoopFilterRemover(plan);
         fRem.visit();
         
-        // reduces the number of MROpers in the MR plan generated 
-        // by multi-query (multi-store) script.
-        MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
-        mqOptimizer.visit();
-
+        boolean isMultiQuery = 
+            
"true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery","true"));
+        
+        if (isMultiQuery) {
+            // reduces the number of MROpers in the MR plan generated 
+            // by multi-query (multi-store) script.
+            MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+            mqOptimizer.visit();
+        }
+        
         // removes unnecessary stores (as can happen with splits in
         // some cases.). This has to run after the MultiQuery and
         // NoopFilterRemover.

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Wed Nov 18 19:01:21 2009
@@ -621,58 +621,7 @@
         // in inner plans of any POSplit operators
         return curIndex;
     }
-    
-    private int setBaseIndexOnDemux(int initial, PODemux demuxOp) 
-            throws VisitorException {
-        int index = initial;
-        demuxOp.setBaseIndex(index++);
-
-        List<PhysicalPlan> pls = demuxOp.getPlans();
-        for (PhysicalPlan pl : pls) {
-            PhysicalOperator leaf = pl.getLeaves().get(0);
-            if (leaf instanceof POLocalRearrange) {
-                POLocalRearrange lr = (POLocalRearrange)leaf;
-                try {
-                    // if the baseindex is set on the demux, then
-                    // POLocalRearranges in its inner plan should really
-                    // be sending an index out by adding the base index
-                    // This is because we would be replicating the demux
-                    // as many times as there are inner plans in the demux
-                    // hence the index coming out of POLocalRearranges
-                    // needs to be adjusted accordingly
-                    lr.setMultiQueryIndex(initial + lr.getIndex());            
       
-                } catch (ExecException e) {                   
-                    int errCode = 2136;
-                    String msg = "Internal Error. Unable to set multi-query 
index for optimization.";
-                    throw new OptimizerException(msg, errCode, 
PigException.BUG, e);                         
-                }   
-            }
-            PhysicalOperator root = pl.getRoots().get(0);
-            if (root instanceof PODemux) {                
-                index = setBaseIndexOnDemux(index, (PODemux)root);
-            } else {
-                index++;
-            }
-        }
-        return index;
-    }
-    
-    private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) {
-        int index = initial;
-        pkgOp.setBaseIndex(index++);
-        
-        List<POPackage> pkgs = pkgOp.getPackages();
-        for (POPackage pkg : pkgs) {            
-            if (pkg instanceof POMultiQueryPackage) {
-                POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg;
-                index = setBaseIndexOnPackage(index, mpkg);
-            } else {
-                index++;
-            }
-        }
-        return index;
-    }
-    
+        
     private void mergeOneReducePlanWithIndex(PhysicalPlan from, 
             PhysicalPlan to, int initial, int current, byte mapKeyType) throws 
VisitorException {                    
         POPackage pk = (POPackage)from.getRoots().get(0);
@@ -684,41 +633,55 @@
             // with the new indexed key
             addShiftedKeyInfoIndex(initial, pk); 
         }
+         
+        int total = current - initial;
         
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);   
     
+        int pkCount = 0;
         if (pk instanceof POMultiQueryPackage) {
-            POMultiQueryPackage mpkg = (POMultiQueryPackage)pk;
-            setBaseIndexOnPackage(initial, mpkg);
-            // we should update the keyinfo map of the 
-            // POPackage objects in the POMultiQueryPackage to
-            // have the shifted index - The index now will be
-            // starting from "initial" going up to "current"
-            // ORed with the multi query bit mask
-            int retIndex = addShiftedKeyInfoIndex(initial, current, mpkg);
-            if(retIndex != current) {
-                int errCode = 2146;
-                String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
-                throw new OptimizerException(msg, errCode, PigException.BUG);
+            List<POPackage> pkgs = ((POMultiQueryPackage)pk).getPackages();
+            for (POPackage p : pkgs) {
+                pkg.addPackage(p);
+                pkCount++;
             }
-        }
-                                
-        PhysicalOperator root = from.getRoots().get(0);
-        if (root instanceof PODemux) {
-            PODemux demux = (PODemux)root;
-            setBaseIndexOnDemux(initial, demux);
-        }
-                    
-        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);   
     
-        for (int i=initial; i<current; i++) {
+            addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
+        } else {
             pkg.addPackage(pk);
+            pkCount = 1;
         }
         
+        if (pkCount != total) {
+            int errCode = 2146;
+            String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+
         boolean[] keyPos = pk.getKeyPositionsInTuple();
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
-        for (int i=initial; i<current; i++) {
+        int plCount = 0;
+        PhysicalOperator root = from.getRoots().get(0);
+        if (root instanceof PODemux) {
+            // flattening the inner plans of the demux operator.
+            // This is based on the fact that if a plan has a demux
+            // operator, then it's the only operator in the plan.
+            List<PhysicalPlan> pls = ((PODemux)root).getPlans();
+            for (PhysicalPlan pl : pls) {
+                demux.addPlan(pl, keyPos);
+                plCount++;
+            }
+            demux.addIsKeyWrappedList(((PODemux)root).getIsKeyWrappedList());
+        } else {
             demux.addPlan(from, mapKeyType, keyPos);
+            plCount = 1;
+        }
+        
+        if (plCount != total) {
+            int errCode = 2146;
+            String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
         }
-               
+
         if (demux.isSameMapKeyType()) {
             pkg.setKeyType(pk.getKeyType());
         } else {
@@ -772,11 +735,7 @@
      */
     private int addShiftedKeyInfoIndex(int initialIndex, int onePastEndIndex,
             POMultiQueryPackage mpkg) throws OptimizerException {
-        // recursively iterate over the packages in the
-        // POMultiQueryPackage adding a shifted keyInfoIndex entry
-        // in the packages in order going from initialIndex upto
-        // onePastEndIndex (exclusive) flattening out any nested
-        // packages in nested POMultiqueryPackages as we traverse
+        
         List<POPackage> pkgs = mpkg.getPackages();
         // if we have lesser pkgs than (onePastEndIndex - initialIndex)
         // its because one or more of the pkgs is a POMultiQueryPackage which
@@ -794,12 +753,8 @@
         int curIndex = initialIndex;
         while (i < end) {
             POPackage pkg = pkgs.get(i);
-            if(pkg instanceof POMultiQueryPackage) {
-                curIndex = addShiftedKeyInfoIndex(curIndex, onePastEndIndex, 
(POMultiQueryPackage)pkg);
-            } else {
-                addShiftedKeyInfoIndex(curIndex, pkg);
-                curIndex++;
-            }
+            addShiftedKeyInfoIndex(curIndex, pkg);
+            curIndex++;
             i++;
         }
         return curIndex; // could be used in a caller who recursively called 
this function
@@ -810,37 +765,11 @@
             PhysicalPlan to, int initial, int current, byte mapKeyType) throws 
VisitorException {
         POPackage cpk = (POPackage)from.getRoots().get(0);
         from.remove(cpk);
-       
-        if (cpk instanceof POMultiQueryPackage) {
-            POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk;
-            setBaseIndexOnPackage(initial, mpkg);
-        }
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
         
         boolean isSameKeyType = demux.isSameMapKeyType();
         
-        PhysicalOperator leaf = from.getLeaves().get(0);
-        if (leaf instanceof POLocalRearrange) {
-            POLocalRearrange clr = (POLocalRearrange)leaf;
-            try {
-                clr.setMultiQueryIndex(initial);            
-            } catch (ExecException e) {                                        
-                int errCode = 2136;
-                String msg = "Internal Error. Unable to set multi-query index 
for optimization.";
-                throw new OptimizerException(msg, errCode, PigException.BUG, 
e);
-            }
-            
-            // change the map key type to tuple when 
-            // multiple splittees have different map key types
-            if (!isSameKeyType) {
-                clr.setKeyType(DataType.TUPLE);
-            }
-        } else if (leaf instanceof PODemux) {
-            PODemux locDemux = (PODemux)leaf;
-            setBaseIndexOnDemux(initial, locDemux);
-        } 
-       
         POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
         
         // if current > initial + 1, it means we had
@@ -848,17 +777,35 @@
         // merge. In that case we would have changed the indices
         // of the POLocalRearranges in the split to be in the
         // range initial to current. To handle key, value pairs
-        // coming out of those POLocalRearranges, we replicate
-        // the Package as many times (in this case, the package
-        // would have to be a POMultiQueryPackage since we had
-        // a POSplit in the map). That Package would have a baseindex
-        // correctly set (in the beginning of this method) and would
-        // be able to handle the outputs from the different
+        // coming out of those POLocalRearranges, we add
+        // the Packages in the 'from' POMultiQueryPackage (in this case, 
+        // it has to be a POMultiQueryPackage since we had
+        // a POSplit in the map) to the 'to' POMultiQueryPackage. 
+        // These Packages would have correct positions in the package 
+        // list and would be able to handle the outputs from the different
         // POLocalRearranges.
-        for (int i=initial; i<current; i++) {
+        int total = current - initial;
+        int pkCount = 0;
+        if (cpk instanceof POMultiQueryPackage) {
+            List<POPackage> pkgs = ((POMultiQueryPackage)cpk).getPackages();
+            for (POPackage p : pkgs) {
+                pkg.addPackage(p);
+                if (!isSameKeyType) {
+                    p.setKeyType(DataType.TUPLE);
+                }
+                pkCount++;
+            }
+        } else {
             pkg.addPackage(cpk);
+            pkCount = 1;
         }
-        
+
+        if (pkCount != total) {
+            int errCode = 2146;
+            String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+
         // all packages should have the same key type
         if (!isSameKeyType) {
             cpk.setKeyType(DataType.TUPLE);          
@@ -868,11 +815,52 @@
         
         boolean[] keyPos = cpk.getKeyPositionsInTuple();
         
-        // See comment above for why we replicated the Package
-        // in the from plan - for the same reason, we replicate
-        // the Demux operators now.
-        for (int i=initial; i<current; i++) {
+        // See comment above for why we flatten the Packages
+        // in the from plan - for the same reason, we flatten
+        // the inner plans of Demux operator now.
+        int plCount = 0;
+        PhysicalOperator leaf = from.getLeaves().get(0);
+        if (leaf instanceof PODemux) {
+            List<PhysicalPlan> pls = ((PODemux)leaf).getPlans();
+            for (PhysicalPlan pl : pls) {
+                demux.addPlan(pl, mapKeyType, keyPos);
+                POLocalRearrange lr = (POLocalRearrange)pl.getLeaves().get(0);
+                try {
+                    lr.setMultiQueryIndex(initial + plCount++);            
+                } catch (ExecException e) {                                    
    
+                    int errCode = 2136;
+                    String msg = "Internal Error. Unable to set multi-query 
index for optimization.";
+                    throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                }
+                
+                // change the map key type to tuple when 
+                // multiple splittees have different map key types
+                if (!isSameKeyType) {
+                    lr.setKeyType(DataType.TUPLE);
+                }
+            }
+        } else {
             demux.addPlan(from, mapKeyType, keyPos);
+            POLocalRearrange lr = (POLocalRearrange)from.getLeaves().get(0);
+            try {
+                lr.setMultiQueryIndex(initial + plCount++);            
+            } catch (ExecException e) {                                        
+                int errCode = 2136;
+                String msg = "Internal Error. Unable to set multi-query index 
for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, 
e);
+            }
+                
+            // change the map key type to tuple when 
+            // multiple splittees have different map key types
+            if (!isSameKeyType) {
+                lr.setKeyType(DataType.TUPLE);
+            }
+        }
+        
+        if (plCount != total) {
+            int errCode = 2146;
+            String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
         }
     }
     
@@ -933,73 +921,7 @@
             // > index + 1
             int incIndex = mergeOneMapPlanWithIndex(
                     mrOp.mapPlan, splitOp, index, sameKeyType);
-            
-            // In the combine and reduce plans the Demux and 
POMultiQueryPackage
-            // operators' baseIndex is set whenever the incIndex above is > 
index + 1
-            // What does this 'baseIndex' mean - here is an attempt to explain 
it:
-            // Consider a map - reduce plan layout as shown below (the comments
-            // apply even if a combine plan was present) - An explanation of 
the "index"
-            // and "baseIndex" fields follows:
-            // The numbers in parenthesis () are "index" values - Note that in 
multiquery
-            // optimizations these indices are actually ORed with a bitmask 
(0x80) on the
-            // map side in the LocalRearrange. The POMultiQueryPackage and 
PODemux operators
-            // then remove this bitmask to restore the original index values. 
In the commentary
-            // below, indices will be referred to without this bitmask - the 
bitmask is only
-            // to identify the multiquery optimization during comparsion - for 
details see the comments
-            // in POLocalRearrange.setIndex().
-            // The numbers in brackets [] are "baseIndex" values. These 
baseIndex values are
-            // used by POMultiQueryPackage and PODemux to calculate the an 
arrayList index which
-            // they use to pick the right package or inner plan respectively. 
All this is needed
-            // since on the map side the indices are assigned after flattening 
all POLocalRearranges
-            // including those nested in Splits into one flat space (as can be 
noticed by the
-            // numbering of the indices under the split in the example below). 
The optimizer then
-            // duplicates the POMultiQueryPackage and Demux inner plan 
corresponding to the split
-            // in the reduce plan (the entities with * in the figure below). 
Now if a key with index '1'
-            // is emitted, it goes to the first POMultiQueryPackage in the 
reduce plan below, which 
-            // then picks the second package in its arraylist of packages 
which is a 
-            // POMultiQueryPackage (the first with a * below). This 
POMultiQueryPackage then picks 
-            // the first package in its list (it arrives at this arraylist 
index of 0 by subtracting
-            // the baseIndex (1) from the index coming in (1)). So the record 
emitted by LocalRearrange(1)
-            // reaches the right package. Likewise, if LocalRearrange(2) emits 
a key,value they would have
-            // an index 2. The first POMultiQueryPackage (with baseIndex 0 
below) would pick the 3rd package
-            // (arraylist index 2 arrived at by doing index - baseIndex which 
is 2 - 0). This is the
-            // duplicated POMultiQueryPackage with baseIndex 1. This would 
inturn pick the second package
-            // in its arraylist (arraylist index 1 arrived at by doing index - 
baseIndex which is 2 - 1)
-            // The idea is that through duplication we make it easier to 
determine which POPackage to pick.
-            // The exact same logic is used by PODemux to pick the right inner 
plan from its arraylist of
-            // inner plans.
-            
-            // The arrows in the figure below show the correspondence between 
the different operators
-            // and plans .i.e the end of an arrow points to the operator or 
plan which consumes
-            // data coming from the root of the arrow
-            
-            // If you look at the layout below column-wise, each "column" is a 
MROper
-            // which is being merged into the parent MROper - the Split 
represents a
-            // MROper which inside of it has 2 MROpers merged in.
-            // A star (*) next to an operator means it is the same object as 
the
-            // other operator with a star(*) - Essentially the same object 
reference
-            // is copied twice.
-            // LocalRearrange(0)           Split                           
LocalRearrange(3)
-            //     |                       /     \                             
  |
-            //     |         LocalRearrange(1)  LocalRearrange(2)              
  |
-            //     |             |     MAP PLAN              |                 
  |
-            // 
----|-------------|---------------------------|-------------------|--------------------
-            //     |             |     REDUCE PLAN           |                 
  |
-            //     |             |   POMultiQueryPackage[0]  |                 
  |
-            //     V             V         | contains        V                 
  V
-            // [ POPackage, POMultiQueryPackage[1]*,POMultiQueryPackage[1]*,   
POPackage]
-            //      |           /    \               /      \                  
  |
-            //      |       POPackage POPackage  POPackage  POPackage          
  |
-            //      |              \                          |                
  |
-            //      |               \   Demux[0]              |                
  |
-            //      V                V    | contains          V                
  V
-            //  [ planOfRedOperators,planWithDemux*, planWithDemux*,      
planOfRedOperators]             
-            //                       /    |              |        \
-            //                      /   Demux[1]        Demux[1]   \
-            //                     V      |               |         V
-            //            
[planOfRedOps,planOfRedOps][planOfRedOps,planOfRedOps]
-            // 
-            
+                        
             // merge the combiner plan
             if (comPl != null) {
                 if (!mrOp.combinePlan.isEmpty()) {                    
@@ -1011,6 +933,7 @@
                     throw new OptimizerException(msg, errCode, 
PigException.BUG);
                 }
             }
+            
             // merge the reducer plan
             mergeOneReducePlanWithIndex(
                     mrOp.reducePlan, redPl, index, incIndex, mrOp.mapKeyType);

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 Wed Nov 18 19:01:21 2009
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -61,14 +62,7 @@
     private static Result eop = new Result(POStatus.STATUS_EOP, null);
     
     transient private Log log = LogFactory.getLog(getClass());
-    
-    /*
-     * The base index of this demux. In the case of
-     * a demux contained in another demux, the index
-     * passed in must be shifted before it can be used.
-     */
-    private int baseIndex = 0;
-    
+        
     /*
      * The list of sub-plans the inner plan is composed of
      */
@@ -178,7 +172,7 @@
 
     @Override
     public String name() {
-        return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " + 
mKey.toString();
+        return "Demux" + isKeyWrapped + " - " + mKey.toString();
     }
 
     @Override
@@ -190,32 +184,35 @@
     public boolean supportsMultipleOutputs() {
         return false;
     }
-
+    
     /**
-     * Sets the base index of this demux. 
-     * 
-     * @param idx the base index
+     * Returns the list of inner plans.
+     *  
+     * @return the list of the nested plans
      */
-    public void setBaseIndex(int idx) {
-        baseIndex = idx;
+    public List<PhysicalPlan> getPlans() {
+        return myPlans;
     }
     
     /**
-     * Returns the base index of this demux
+     * Returns the list of booleans that indicates if the 
+     * key needs to unwrapped for the corresponding plan.
      * 
-     * @return the base index
+     * @return the list of isKeyWrapped boolean values
      */
-    public int getBaseIndex() {
-        return baseIndex;
+    public List<Boolean> getIsKeyWrappedList() {
+        return Collections.unmodifiableList(isKeyWrapped);
     }
     
     /**
-     * Returns the list of inner plans.
-     *  
-     * @return the list of the nested plans
+     * Adds a list of IsKeyWrapped boolean values
+     * 
+     * @param lst the list of boolean values to add
      */
-    public List<PhysicalPlan> getPlans() {
-        return myPlans;
+    public void addIsKeyWrappedList(List<Boolean> lst) {
+        for (Boolean b : lst) {
+            isKeyWrapped.add(b);
+        }
     }
     
     /**
@@ -232,6 +229,12 @@
         isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
         keyPositions.add(keyPos);
     }
+    
+    public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {  
+        myPlans.add(inPlan);
+        processedSet.set(myPlans.size()-1);
+        keyPositions.add(keyPos);
+    }
    
     @Override
     public Result getNext(Tuple t) throws ExecException {
@@ -357,8 +360,7 @@
         // the POLocalRearrange operator and passed to this operator
         // by POMultiQueryPackage
         int index = fld.getIndex();
-        index &= idxPart;
-        index -= baseIndex;                         
+        index &= idxPart;                      
         
         PhysicalPlan pl = myPlans.get(index);
         if (!(pl.getRoots().get(0) instanceof PODemux)) {                      
       

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 Wed Nov 18 19:01:21 2009
@@ -64,8 +64,6 @@
     private List<POPackage> packages = new ArrayList<POPackage>();
 
     transient private PigNullableWritable myKey;
-    
-    private int baseIndex = 0;      
 
     /**
      * Constructs an operator with the specified key.
@@ -111,7 +109,7 @@
 
     @Override
     public String name() {
-        return "MultiQuery Package[" + baseIndex +"] - " +  
getOperatorKey().toString();
+        return "MultiQuery Package  - " +  getOperatorKey().toString();
     }
 
     @Override
@@ -174,7 +172,6 @@
 
         int index = (int)origIndex;
         index &= idxPart;
-        index -= baseIndex;
         
         if (index >= packages.size() || index < 0) {
             int errCode = 2140;
@@ -221,21 +218,4 @@
         return res;
     }
 
-    /**
-     * Sets the base index of this operator
-     * 
-     * @param baseIndex the base index of this operator
-     */
-    public void setBaseIndex(int baseIndex) {
-        this.baseIndex = baseIndex;
-    }
-
-    /**
-     * Returns the base index of this operator
-     * 
-     * @return the base index of this operator
-     */
-    public int getBaseIndex() {
-        return baseIndex;
-    }      
 }

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java?rev=881879&r1=881878&r2=881879&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
Wed Nov 18 19:01:21 2009
@@ -34,22 +34,28 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
-import org.apache.pig.backend.executionengine.util.ExecTools;
 import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.util.ExecTools;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -57,20 +63,12 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.util.Progressable;
 
 public class TestMultiQuery extends TestCase {
 
@@ -91,6 +89,137 @@
     }
     
     @Test
+    public void testMultiQueryJiraPig1060() {
+
+        // test case: 
+
+        String INPUT_FILE = "pig-1060.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\t2");
+            w.println("apple\t12");
+            w.println("orange\t3");
+            w.println("orange\t23");
+            w.println("strawberry\t10");
+            w.println("strawberry\t34");
+
+            w.close();
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("data = load '" + INPUT_FILE +
+                                "' as (name:chararray, gid:int);");
+            myPig.registerQuery("f1 = filter data by gid < 5;");
+            myPig.registerQuery("g1 = group f1 by name;");
+            myPig.registerQuery("p1 = foreach g1 generate group, 
COUNT(f1.gid);");
+            myPig.registerQuery("store p1 into '/tmp/output1';");
+
+            myPig.registerQuery("f2 = filter data by gid > 5;");
+            myPig.registerQuery("g2 = group f2 by name;");
+            myPig.registerQuery("p2 = foreach g2 generate group, 
COUNT(f2.gid);");
+            myPig.registerQuery("store p2 into '/tmp/output2';");
+
+            myPig.registerQuery("f3 = filter f2 by gid > 10;");
+            myPig.registerQuery("g3 = group f3 by name;");
+            myPig.registerQuery("p3 = foreach g3 generate group, 
COUNT(f3.gid);");
+            myPig.registerQuery("store p3 into '/tmp/output3';");
+
+            myPig.registerQuery("f4 = filter f3 by gid < 20;");
+            myPig.registerQuery("g4 = group f4 by name;");
+            myPig.registerQuery("p4 = foreach g4 generate group, 
COUNT(f4.gid);");
+            myPig.registerQuery("store p4 into '/tmp/output4';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 4, 27);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 4, 35);
+
+            checkMRPlan(pp, 1, 1, 1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+     
+    @Test
+    public void testMultiQueryJiraPig1060_2() {
+
+        // test case: 
+
+        String INPUT_FILE = "pig-1060.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\t2");
+            w.println("apple\t12");
+            w.println("orange\t3");
+            w.println("orange\t23");
+            w.println("strawberry\t10");
+            w.println("strawberry\t34");
+
+            w.close();
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("data = load '" + INPUT_FILE +
+            "' as (name:chararray, gid:int);");
+            myPig.registerQuery("f1 = filter data by gid < 5;");
+            myPig.registerQuery("g1 = group f1 by name;");
+            myPig.registerQuery("p1 = foreach g1 generate group, 
COUNT(f1.gid);");
+            myPig.registerQuery("store p1 into '/tmp/output1';");
+
+            myPig.registerQuery("f2 = filter data by gid > 5;");
+            myPig.registerQuery("g2 = group f2 by name;");
+            myPig.registerQuery("p2 = foreach g2 generate group, 
COUNT(f2.gid);");
+            myPig.registerQuery("store p2 into '/tmp/output2';");
+
+            myPig.registerQuery("f3 = filter f2 by gid > 10;");
+            myPig.registerQuery("g3 = group f3 by name;");
+            myPig.registerQuery("p3 = foreach g3 generate group, 
COUNT(f3.gid);");
+            myPig.registerQuery("store p3 into '/tmp/output3';");
+
+            myPig.registerQuery("f4 = filter f3 by gid < 20;");
+            myPig.registerQuery("g4 = group f4 by name;");
+            myPig.registerQuery("p4 = foreach g4 generate group, 
COUNT(f4.gid);");
+            myPig.registerQuery("store p4 into '/tmp/output4';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(4, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+
+    @Test
     public void testMultiQueryJiraPig920() {
 
         // test case: a simple diamond query
@@ -485,6 +614,10 @@
             List<ExecJob> jobs = myPig.executeBatch();
             assertTrue(jobs.size() == 2);
 
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -607,7 +740,11 @@
             myPig.registerQuery("d2 = foreach d1 generate group, 
AVG(d.uid);");            
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -740,8 +877,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - 
MIN(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
-            
+            List<ExecJob> jobs = myPig.executeBatch();
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+  
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -808,7 +949,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - 
MIN(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -876,7 +1022,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, 
COUNT(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -989,7 +1140,12 @@
             myPig.registerQuery("H = foreach G generate group, COUNT(A1);");   
       
             myPig.registerQuery("store H into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -1055,7 +1211,12 @@
             myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
             myPig.registerQuery("store g1 into '/tmp/output3';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -1111,7 +1272,12 @@
             myPig.registerQuery("e = foreach d generate flatten(b), 
flatten(c);");
             myPig.registerQuery("store e into '/tmp/output2';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1169,7 +1335,12 @@
             myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
             myPig.registerQuery("store e into '/tmp/output3';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1278,7 +1449,12 @@
             myPig.registerQuery("b = load '/tmp/output1' using 
PigStorage(':'); ");
             myPig.registerQuery("store b into '/tmp/output2';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1596,7 +1772,7 @@
         }
         
     }
-    
+   
     @Test
     public void testMultiQueryWithTwoStores() {
 
@@ -2067,7 +2243,7 @@
             Assert.fail();
         } 
     }
-    
+   
     /**
      * Test that pig calls checkOutputSpecs() method of the OutputFormat (if 
the
      * StoreFunc defines an OutputFormat as the return value of 
@@ -2314,6 +2490,9 @@
 
         showPlanOperators(mrp);
         
+        System.out.println("===== Display map-reduce Plan =====");
+        System.out.println(mrp.toString());
+        
         Assert.assertEquals(expectedRoots, mrp.getRoots().size());
         Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
         Assert.assertEquals(expectedSize, mrp.size());


Reply via email to