Author: daijy
Date: Sun Sep 26 21:22:57 2010
New Revision: 1001523

URL: http://svn.apache.org/viewvc?rev=1001523&view=rev
Log:
PIG-1644: New logical plan: Plan.connect with position is misused in some places

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/BaseOperatorPlan.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorPlan.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorSubPlan.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/Util.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Sun Sep 26 21:22:57 2010
@@ -198,6 +198,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1644: New logical plan: Plan.connect with position is misused in some
+places (daijy)
+
 PIG-1643: join fails for a query with input having 'load using pigstorage
 without schema' + 'foreach' (daijy)
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/BaseOperatorPlan.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/BaseOperatorPlan.java 
Sun Sep 26 21:22:57 2010
@@ -316,5 +316,118 @@ public abstract class BaseOperatorPlan i
             return "";
         }
         return os.toString();
-    }   
+    }
+    
+    @Override
+    public void replace(Operator oldOperator, Operator newOperator) throws 
FrontendException {
+        add(newOperator);
+        
+        List<Operator> preds = getPredecessors(oldOperator);
+        if (preds!=null) {
+            List<Operator> predsCopy = new ArrayList<Operator>();
+            predsCopy.addAll(preds);
+            for (int i=0;i<predsCopy.size();i++) {
+                Operator pred = predsCopy.get(i);
+                Pair<Integer, Integer> pos = disconnect(pred, oldOperator);
+                connect(pred, pos.first, newOperator, i);
+            }
+        }
+        
+        List<Operator> succs = getSuccessors(oldOperator);
+        if (succs!=null) {
+            List<Operator> succsCopy = new ArrayList<Operator>();
+            succsCopy.addAll(succs);
+            for (int i=0;i<succsCopy.size();i++) {
+                Operator succ = succsCopy.get(i);
+                Pair<Integer, Integer> pos = disconnect(oldOperator, succ);
+                connect(newOperator, i, succ, pos.second);
+            }
+        }
+        
+        remove(oldOperator);
+    }
+    
+    // We assume if node has multiple inputs, it only has one output;
+    // if node has multiple outputs, it only has one input.
+    // Otherwise, we don't know how to connect inputs to outputs.
+    // This assumption is true for logical plan/physical plan, and most MR plan
+    @Override
+    public void removeAndReconnect(Operator operatorToRemove) throws 
FrontendException {
+        List<Operator> predsCopy = null;
+        if (getPredecessors(operatorToRemove)!=null && 
getPredecessors(operatorToRemove).size()!=0) {
+            predsCopy = new ArrayList<Operator>();
+            predsCopy.addAll(getPredecessors(operatorToRemove));
+        }
+        
+        List<Operator> succsCopy = null;
+        if (getSuccessors(operatorToRemove)!=null && 
getSuccessors(operatorToRemove).size()!=0) {
+            succsCopy = new ArrayList<Operator>();
+            succsCopy.addAll(getSuccessors(operatorToRemove));
+        }
+        
+        if (predsCopy!=null && predsCopy.size()>1 && succsCopy!=null && 
succsCopy.size()>1) {
+            throw new FrontendException("Cannot remove and reconnect node with 
multiple inputs/outputs", 2256);
+        }
+        
+        if (predsCopy!=null && predsCopy.size()>1) {
+            // node has multiple inputs, it can only has one output (or no 
output)
+            // reconnect inputs to output
+            Operator succ = null;
+            Pair<Integer, Integer> pos2 = null;
+            if (succsCopy!=null) {
+                succ = succsCopy.get(0);
+                pos2 = disconnect(operatorToRemove, succ);
+            }
+            for (Operator pred : predsCopy) {
+                Pair<Integer, Integer> pos1 = disconnect(pred, 
operatorToRemove);
+                if (succ!=null) {
+                    connect(pred, pos1.first, succ, pos2.second);
+                }
+            }
+        } else if (succsCopy!=null && succsCopy.size()>1) {
+            // node has multiple outputs, it can only has one output (or no 
output)
+            // reconnect input to outputs
+            Operator pred = null;
+            Pair<Integer, Integer> pos1 = null;
+            if (predsCopy!=null) {
+                pred = predsCopy.get(0);
+                pos1 = disconnect(pred, operatorToRemove);
+            }
+            for (Operator succ : succsCopy) {
+                Pair<Integer, Integer> pos2 = disconnect(operatorToRemove, 
succ);
+                if (pred!=null) {
+                    connect(pred, pos1.first, succ, pos2.second);
+                }
+            }
+        } else {
+            // Only have one input/output
+            Operator pred = null;
+            Pair<Integer, Integer> pos1 = null;
+            if (predsCopy!=null) {
+                pred = predsCopy.get(0);
+                pos1 = disconnect(pred, operatorToRemove);
+            }
+            
+            Operator succ = null;
+            Pair<Integer, Integer> pos2 = null;
+            if (succsCopy!=null) {
+                succ = succsCopy.get(0);
+                pos2 = disconnect(operatorToRemove, succ);
+            }
+            
+            if (pred!=null && succ!=null) {
+                connect(pred, pos1.first, succ, pos2.second);
+            }
+        }
+        
+        remove(operatorToRemove);
+    }
+    
+    @Override
+    public void insertBetween(Operator pred, Operator operatorToInsert, 
Operator succ) throws FrontendException {
+        add(operatorToInsert);
+        Pair<Integer, Integer> pos = disconnect(pred, succ);
+        connect(pred, pos.first, operatorToInsert, 0);
+        connect(operatorToInsert, 0, succ, pos.second);
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorPlan.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorPlan.java 
(original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorPlan.java 
Sun Sep 26 21:22:57 2010
@@ -160,4 +160,30 @@ public interface OperatorPlan {
      * @throws FrontendException
      */
     public boolean isEqual( OperatorPlan other ) throws FrontendException;
+    
+    /**
+     * This method replace the oldOperator with the newOperator, make all 
connection
+     * to the new operator in the place of old operator
+     * @param oldOperator operator to be replaced
+     * @param newOperator operator to replace
+     * @throws FrontendException
+     */
+    public void replace(Operator oldOperator, Operator newOperator) throws 
FrontendException;
+    
+    /**
+     * This method remove a node operatorToRemove. It also Connect all its 
successors to 
+     * predecessor/connect all it's predecessors to successor
+     * @param operatorToRemove operator to remove
+     * @throws FrontendException
+     */
+    public void removeAndReconnect(Operator operatorToRemove) throws 
FrontendException;
+
+    /**
+     * This method insert node operatorToInsert between pred and succ. Both 
pred and succ cannot be null
+     * @param pred predecessor of inserted node after this method
+     * @param operatorToInsert operato to insert
+     * @param succ successor of inserted node after this method
+     * @throws FrontendException
+     */
+    public void insertBetween(Operator pred, Operator operatorToInsert, 
Operator succ) throws FrontendException;
 }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorSubPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorSubPlan.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorSubPlan.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/OperatorSubPlan.java 
Sun Sep 26 21:22:57 2010
@@ -162,7 +162,7 @@ public class OperatorSubPlan implements 
     
     @Override
     public void removeSoftLink(Operator from, Operator to) {
-        throw new UnsupportedOperationException("connect() can not be called 
on OperatorSubPlan");
+        throw new UnsupportedOperationException("removeSoftLink() can not be 
called on OperatorSubPlan");
     }
 
     @Override
@@ -173,5 +173,26 @@ public class OperatorSubPlan implements 
     @Override
     public List<Operator> getSoftLinkSuccessors(Operator op) {
         return basePlan.getSoftLinkSuccessors(op);
-    }    
+    }
+
+    @Override
+    public void insertBetween(Operator pred, Operator operatorToInsert, 
Operator succ)
+            throws FrontendException {
+        throw new UnsupportedOperationException("insertBetween() can not be 
called on OperatorSubPlan");
+        
+    }
+
+    @Override
+    public void removeAndReconnect(Operator operatorToRemove)
+            throws FrontendException {
+        throw new UnsupportedOperationException("removeAndReconnect() can not 
be called on OperatorSubPlan");
+        
+    }
+
+    @Override
+    public void replace(Operator oldOperator, Operator newOperator)
+            throws FrontendException {
+        throw new UnsupportedOperationException("replace() can not be called 
on OperatorSubPlan");
+        
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
 Sun Sep 26 21:22:57 2010
@@ -86,9 +86,7 @@ public class ForeachInnerPlanVisitor ext
                 org.apache.pig.newplan.Operator newPred = 
innerOpsMap.get(pred);
                 if (newPred.getPlan().getSuccessors(newPred)!=null) {
                     org.apache.pig.newplan.Operator newSucc = 
newOp.getPlan().getSuccessors(newPred).get(0);
-                    Pair<Integer, Integer> pair = 
newOp.getPlan().disconnect(newPred, newSucc);
-                    newOp.getPlan().connect(newPred, newOp);
-                    newOp.getPlan().connect(newOp, pair.first, newSucc, 
pair.second);
+                    newOp.getPlan().insertBetween(newPred, newOp, newSucc);
                 }
                 else {
                     newOp.getPlan().connect(newPred, newOp);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/Util.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/Util.java 
(original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/Util.java 
Sun Sep 26 21:22:57 2010
@@ -115,11 +115,11 @@ public class Util {
         List<Operator> next = plan.getSuccessors(op);
         if (next != null) {
             LogicalRelationalOperator nextOp = 
(LogicalRelationalOperator)next.get(branch);
-            Pair<Integer, Integer> pos = plan.disconnect(op, nextOp);
-            plan.connect(foreach, pos.first, nextOp, pos.second);
+            plan.insertBetween(op, foreach, nextOp);
+        }
+        else {
+            plan.connect(op, foreach);
         }
-        
-        plan.connect(op, foreach);
         
         LogicalPlan innerPlan = new LogicalPlan();
         foreach.setInnerPlan(innerPlan);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
 Sun Sep 26 21:22:57 2010
@@ -171,9 +171,7 @@ public class ColumnPruneVisitor extends 
                 // add foreach to the base plan                       
                 p.add(foreach);
                                
-                Pair<Integer,Integer> disconnectedPos = p.disconnect(load, 
next);
-                p.connect(load, disconnectedPos.first.intValue(), foreach, 0 );
-                p.connect(foreach, 0, next, disconnectedPos.second.intValue());
+                p.insertBetween(load, foreach, next);
                 
                 LogicalPlan innerPlan = new LogicalPlan();
                 foreach.setInnerPlan(innerPlan);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
 Sun Sep 26 21:22:57 2010
@@ -260,13 +260,8 @@ public class FilterAboveForeach extends 
              *  -- And ForEach is FilterPred 
              */
             
-            Pair<Integer, Integer> forEachPredPlaces = 
currentPlan.disconnect(forEachPred, foreach);
-            Pair<Integer, Integer> filterPredPlaces = 
currentPlan.disconnect(filterPred, filter);
-            Pair<Integer, Integer> filterSucPlaces = 
currentPlan.disconnect(filter, filterSuc);
-            
-            currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, 
filterPredPlaces.second);
-            currentPlan.connect(filter, filterSucPlaces.first, foreach, 
forEachPredPlaces.second);
-            currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, 
filterSucPlaces.second);
+            currentPlan.removeAndReconnect(filter);
+            currentPlan.insertBetween(forEachPred, filter, foreach);
             
             subPlan.add(forEachPred);
             subPlan.add(foreach);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
 Sun Sep 26 21:22:57 2010
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LODistinct;
@@ -118,16 +119,11 @@ public class LimitOptimizer extends Rule
 
             if (pred instanceof LOForEach) {
                 // We can safely move LOLimit up
-                // Get operator before LOFilter
+                // Get operator before LOForEach
                 Operator prepredecessor = currentPlan.getPredecessors(pred)
-                        .get(0);
-                Operator succ = currentPlan.getSuccessors(limit).get(0);
-                currentPlan.disconnect(prepredecessor, pred);
-                currentPlan.disconnect(pred, limit);
-                currentPlan.disconnect(limit, succ);
-                currentPlan.connect(prepredecessor, limit);
-                currentPlan.connect(limit, pred);
-                currentPlan.connect(pred, succ);
+                    .get(0);
+                currentPlan.removeAndReconnect(limit);
+                currentPlan.insertBetween(prepredecessor, limit, pred);
             } else if (pred instanceof LOCross || pred instanceof LOUnion) {
                 // Limit can be duplicated, and the new instance pushed in 
front
                 // of an operator for the following operators
@@ -146,10 +142,7 @@ public class LimitOptimizer extends Rule
                     } else {
                         newLimit = new LOLimit((LogicalPlan) currentPlan, limit
                                 .getLimit());
-                        currentPlan.add(newLimit);
-                        currentPlan.disconnect(prepredecessor, pred);
-                        currentPlan.connect(prepredecessor, newLimit);
-                        currentPlan.connect(newLimit, pred);
+                        currentPlan.insertBetween(prepredecessor, newLimit, 
pred);
                     }
                 }
             } else if (pred instanceof LOSort) {
@@ -161,11 +154,7 @@ public class LimitOptimizer extends Rule
                             .getLimit() : limit.getLimit());
 
                 // remove the limit
-                Operator succ = currentPlan.getSuccessors(limit).get(0);
-                currentPlan.disconnect(sort, limit);
-                currentPlan.disconnect(limit, succ);
-                currentPlan.connect(sort, succ);
-                currentPlan.remove(limit);
+                currentPlan.removeAndReconnect(limit);
             } else if (pred instanceof LOLimit) {
                 // Limit is merged into another LOLimit
                 LOLimit beforeLimit = (LOLimit) pred;
@@ -174,11 +163,7 @@ public class LimitOptimizer extends Rule
                                 .getLimit()
                                 : limit.getLimit());
                 // remove the limit
-                Operator succ = currentPlan.getSuccessors(limit).get(0);
-                currentPlan.disconnect(beforeLimit, limit);
-                currentPlan.disconnect(limit, succ);
-                currentPlan.connect(beforeLimit, succ);
-                currentPlan.remove(limit);
+                currentPlan.removeAndReconnect(limit);
             } else if (pred instanceof LOSplitOutput) {
                 // Limit and OrderBy (LOSort) can be separated by split
                 List<Operator> grandparants = 
currentPlan.getPredecessors(pred);
@@ -197,13 +182,7 @@ public class LimitOptimizer extends Rule
                                 sort.getUserFunc());
                         newSort.setLimit(limit.getLimit());
 
-                        Operator succ = 
currentPlan.getSuccessors(limit).get(0);
-                        currentPlan.disconnect(pred, limit);
-                        currentPlan.disconnect(limit, succ);
-                        currentPlan.add(newSort);
-                        currentPlan.connect(pred, newSort);
-                        currentPlan.connect(newSort, succ);
-                        currentPlan.remove(limit);
+                        currentPlan.replace(limit, newSort);
                     }
                 }
             }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
 Sun Sep 26 21:22:57 2010
@@ -75,24 +75,21 @@ public class MergeFilter extends Rule {
             if (succeds != null && succeds.size()== 1 && (succeds.get(0) 
instanceof LOFilter)) {
                 LOFilter next = (LOFilter)succeds.get(0);
                 combineFilterCond(filter, next);
-                Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, 
next);
-                List<Operator> ll = currentPlan.getSuccessors(next);
-                if (ll!= null && ll.size()>0) {
-                    Operator op = ll.get(0);
-                    Pair<Integer, Integer> p2 = currentPlan.disconnect(next, 
op);
-                    currentPlan.connect(filter, p1.first, op, p2.second);
-                    subPlan.add(op);
-                }
                 
+                List<Operator> succs = currentPlan.getSuccessors(next);
+                if (succs!=null && succs.size()>0) {
+                    subPlan.add(succs.get(0));
+                }
+
                 // Since we remove next, we need to merge soft link into filter
                 List<Operator> nextSoftPreds = 
currentPlan.getSoftLinkPredecessors(next);
                 if (nextSoftPreds!=null) {
                     for (Operator softPred : nextSoftPreds) {
+                        currentPlan.removeSoftLink(softPred, next);
                         currentPlan.createSoftLink(softPred, filter);
                     }
                 }
-                
-                currentPlan.remove(next);
+                currentPlan.removeAndReconnect(next);
             }
             
             Iterator<Operator> iter = filter.getFilterPlan().getOperators();

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
 Sun Sep 26 21:22:57 2010
@@ -179,7 +179,7 @@ public class MergeForEach extends Rule {
                             Operator exp2NextToSink = 
newExpPlan.getPredecessors(exp2Sink).get(0);
                             Pair<Integer, Integer> pos = 
newExpPlan.disconnect(exp2NextToSink, exp2Sink);
                             newExpPlan.remove(exp2Sink);
-                            newExpPlan.connect(exp2NextToSink, pos.first, 
exp1Source, pos.second);
+                            newExpPlan.connect(exp2NextToSink, pos.first, 
exp1Source, 0);
                         }
                         else {
                             newExpPlan.remove(exp2Sink);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
 Sun Sep 26 21:22:57 2010
@@ -42,6 +42,7 @@ import org.apache.pig.newplan.PColFilter
 import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.optimizer.Transformer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
 
 public class PartitionFilterOptimizer extends Rule {
     private String[] partitionKeys;
@@ -157,17 +158,8 @@ public class PartitionFilterOptimizer ex
                                } catch (IOException e) {
                                        throw new FrontendException( e );
                                }
-                       if(pColFilterFinder.isFilterRemovable()) {
-                               // remove this filter from the plan  
-                               Operator from = currentPlan.getPredecessors( 
loFilter ).get( 0 );
-                               currentPlan.disconnect( from, loFilter );
-                               List<Operator> succs = 
currentPlan.getSuccessors( loFilter );
-                               if( succs != null ) {
-                                       Operator to = succs.get( 0 );
-                                       currentPlan.disconnect( loFilter, to );
-                                       currentPlan.connect( from, to );
-                               }
-                               currentPlan.remove( loFilter );
+                       if(pColFilterFinder.isFilterRemovable()) {  
+                               currentPlan.removeAndReconnect( loFilter );
                        }
                }
         }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
 Sun Sep 26 21:22:57 2010
@@ -211,17 +211,17 @@ public class PushDownForEachFlatten exte
             LOForEach foreach = (LOForEach)matched.getSources().get(0);
             Operator next = currentPlan.getSuccessors( foreach ).get(0);
             if( next instanceof LOSort ) {
-                Operator pred = currentPlan.getPredecessors( foreach ).get( 0 
);
+                currentPlan.removeAndReconnect(foreach);
+                
                 List<Operator> succs = currentPlan.getSuccessors( next );
-                currentPlan.disconnect( pred, foreach );
-                currentPlan.disconnect( foreach, next );
-                currentPlan.connect( pred, next );
-                currentPlan.connect( next, foreach );
                 if( succs != null ) {
-                    for( Operator succ : succs ) {
-                        currentPlan.disconnect( next, succ );
-                        currentPlan.connect( foreach, succ );
+                    List<Operator> succsCopy = new ArrayList<Operator>();
+                    succsCopy.addAll(succs);
+                    for( Operator succ : succsCopy ) {
+                        currentPlan.insertBetween(next, foreach, succ);
                     }
+                } else {
+                    currentPlan.connect( next, foreach );
                 }
             } else if( next instanceof LOCross || next instanceof LOJoin ) {
                 List<Operator> preds = currentPlan.getPredecessors( next );
@@ -275,9 +275,7 @@ public class PushDownForEachFlatten exte
                     currentPlan.connect( next, newForeach );
                 } else {
                     opAfterX = succs.get( 0 );
-                    Pair<Integer, Integer> pos = currentPlan.disconnect( next, 
opAfterX );
-                    currentPlan.connect( next, pos.first, newForeach, 
pos.second );
-                    currentPlan.connect( newForeach, opAfterX );
+                    currentPlan.insertBetween(next, newForeach, opAfterX);
                 }
                 
                 // Finally remove flatten flags from the original foreach and 
regenerate schemas for those impacted.

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
 Sun Sep 26 21:22:57 2010
@@ -207,8 +207,6 @@ public class PushUpFilter extends Rule {
             Operator predecessor = this.findNonFilterPredecessor( filter );
             subPlan.add( predecessor) ;
             
-            // Disconnect the filter in the plan without removing it from the 
plan.
-            Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
             Operator succed;
             
             if (currentPlan.getSuccessors(filter)!=null)
@@ -216,14 +214,12 @@ public class PushUpFilter extends Rule {
             else
                 succed = null;
             
-            Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
-            
             if (succed!=null) {
                 subPlan.add(succed);
-                Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, 
succed);
-                currentPlan.connect(predec, p1.first, succed, p2.second);
             }
             
+            currentPlan.removeAndReconnect(filter);
+            
             if( predecessor instanceof LOSort || predecessor instanceof 
LODistinct ||
                 ( predecessor instanceof LOCogroup && 
currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
                 // For sort, put the filter in front of it.
@@ -322,9 +318,7 @@ public class PushUpFilter extends Rule {
         // Insert the filter in between the given two operators.
         private void insertFilter(Operator prev, Operator predecessor, 
LOFilter filter)
         throws FrontendException {
-            Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, 
predecessor );
-            currentPlan.connect( prev, p3.first, filter, 0 );
-            currentPlan.connect( filter, 0, predecessor, p3.second );
+            currentPlan.insertBetween(prev, filter, predecessor);
         }
         
         // Identify those among preds that will need to have a filter between 
it and the predecessor.

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/SplitFilter.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
 Sun Sep 26 21:22:57 2010
@@ -89,9 +89,7 @@ public class SplitFilter extends Rule { 
             if (succeds != null) {
                 succed = succeds.get(0);
                 subPlan.add(succed);
-                Pair<Integer, Integer> p = currentPlan.disconnect(filter, 
succed);
-                currentPlan.connect(filter2, 0, succed, p.second);
-                currentPlan.connect(filter, p.first, filter2, 0); 
+                currentPlan.insertBetween(filter, filter2, succed);
             } else {
                 currentPlan.connect(filter, 0, filter2, 0); 
             }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
 Sun Sep 26 21:22:57 2010
@@ -120,10 +120,7 @@ public abstract class TypeCastInserter e
             
             // Insert the foreach into the plan and patch up the plan.
             Operator next = currentPlan.getSuccessors(op).get(0);
-            Pair<Integer,Integer> disconnectedPos = currentPlan.disconnect(op, 
next);
-            currentPlan.add(foreach);
-            currentPlan.connect(op, disconnectedPos.first.intValue(), foreach, 
0 );
-            currentPlan.connect(foreach, 0, next, 
disconnectedPos.second.intValue());
+            currentPlan.insertBetween(op, foreach, next);
             
             List<LogicalExpressionPlan> exps = new 
ArrayList<LogicalExpressionPlan>();
             LOGenerate gen = new LOGenerate(innerPlan, exps, new 
boolean[s.size()]);

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanOperatorPlan.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
 Sun Sep 26 21:22:57 2010
@@ -46,6 +46,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
@@ -1538,5 +1539,351 @@ public class TestNewPlanOperatorPlan ext
         assertTrue(D1.isEqual(D2));
     }
     
- 
+    @Test
+    public void testReplace1() throws FrontendException {
+        // has multiple inputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator join1 = new SillyOperator("join1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.add(join1);
+        plan.connect(load1, join1);
+        plan.connect(load2, filter1);
+        plan.connect(filter1, join1);
+        plan.connect(join1, filter2);
+        
+        Operator join2 = new SillyOperator("join2", plan);
+        plan.replace(join1, join2);
+        
+        List<Operator> preds = plan.getPredecessors(join2);
+        assert(preds.size()==2);
+        assert(preds.contains(load1));
+        assert(preds.contains(filter1));
+        
+        List<Operator> succs = plan.getSuccessors(join2);
+        assert(succs.size()==1);
+        assert(succs.contains(filter2));
+    }
+    
+    @Test
+    public void testReplace2() throws FrontendException {
+        // has multiple outputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator split1 = new SillyOperator("split1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(split1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, split1);
+        plan.connect(split1, filter1);
+        plan.connect(split1, filter2);
+        
+        Operator split2 = new SillyOperator("split2", plan);
+        plan.replace(split1, split2);
+        
+        List<Operator> preds = plan.getPredecessors(split2);
+        assert(preds.size()==1);
+        assert(preds.contains(load1));
+        
+        List<Operator> succs = plan.getSuccessors(split2);
+        assert(succs.size()==2);
+        assert(succs.contains(filter1));
+        assert(succs.contains(filter2));
+    }
+    
+    @Test
+    public void testReplace3() throws FrontendException {
+        // single input/output
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, filter1);
+        plan.connect(filter1, filter2);
+        
+        Operator filter3 = new SillyOperator("filter3", plan);
+        plan.replace(filter1, filter3);
+        
+        List<Operator> preds = plan.getPredecessors(filter3);
+        assert(preds.size()==1);
+        assert(preds.contains(load1));
+        
+        List<Operator> succs = plan.getSuccessors(filter3);
+        assert(succs.size()==1);
+        assert(succs.contains(filter2));
+    }
+
+    @Test
+    public void testReplace4() throws FrontendException {
+        // output is null
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, filter1);
+        plan.connect(filter1, filter2);
+        
+        Operator filter3 = new SillyOperator("filter3", plan);
+        plan.replace(filter2, filter3);
+        
+        List<Operator> preds = plan.getPredecessors(filter3);
+        assert(preds.size()==1);
+        assert(preds.contains(filter1));
+        
+        List<Operator> succs = plan.getSuccessors(filter3);
+        assert(succs==null);
+    }
+    
+    @Test
+    public void testReplace5() throws FrontendException {
+        // input is null
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, filter1);
+        plan.connect(filter1, filter2);
+        
+        Operator load2 = new SillyOperator("load2", plan);
+        plan.replace(load1, load2);
+        
+        List<Operator> preds = plan.getPredecessors(load2);
+        assert(preds==null);
+        
+        List<Operator> succs = plan.getSuccessors(load2);
+        assert(succs.size()==1);
+        assert(succs.contains(filter1));
+    }
+    
+    @Test
+    public void testReplace6() throws FrontendException {
+        // has multiple inputs/outputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        // fake operator to take multiple inputs/outputs
+        Operator fake1 = new SillyOperator("fake1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        Operator filter3 = new SillyOperator("filter3", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.add(filter3);
+        plan.add(fake1);
+        plan.connect(load1, fake1);
+        plan.connect(load2, filter1);
+        plan.connect(filter1, fake1);
+        plan.connect(fake1, filter2);
+        plan.connect(fake1, filter3);
+        
+        Operator fake2 = new SillyOperator("fake2", plan);
+        plan.replace(fake1, fake2);
+        
+        List<Operator> preds = plan.getPredecessors(fake2);
+        assert(preds.size()==2);
+        assert(preds.contains(load1));
+        assert(preds.contains(filter1));
+        
+        List<Operator> succs = plan.getSuccessors(fake2);
+        assert(succs.size()==2);
+        assert(succs.contains(filter2));
+        assert(succs.contains(filter3));
+    }
+    
+    @Test
+    public void testRemove1() throws FrontendException {
+        // single input/output
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator join1 = new SillyOperator("join1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.add(join1);
+        plan.connect(load1, join1);
+        plan.connect(load2, filter1);
+        plan.connect(filter1, join1);
+        plan.connect(join1, filter2);
+        
+        plan.removeAndReconnect(filter1);
+        
+        List<Operator> preds = plan.getPredecessors(join1);
+        assert(preds.size()==2);
+        assert(preds.contains(load2));
+    }
+    
+    @Test
+    public void testRemove2() throws FrontendException {
+        // input is null
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator join1 = new SillyOperator("join1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.add(join1);
+        plan.connect(load1, join1);
+        plan.connect(load2, filter1);
+        plan.connect(filter1, join1);
+        plan.connect(join1, filter2);
+        
+        plan.removeAndReconnect(load1);
+        
+        List<Operator> preds = plan.getPredecessors(join1);
+        assert(preds.size()==1);
+        assert(preds.contains(filter1));
+        
+        plan.removeAndReconnect(filter1);
+        preds = plan.getPredecessors(join1);
+        assert(preds.size()==1);
+        assert(preds.contains(load2));
+    }
+    
+    @Test
+    public void testRemove3() throws FrontendException {
+        // output is null
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, filter1);
+        plan.connect(filter1, filter2);
+        
+        plan.removeAndReconnect(filter2);
+        
+        List<Operator> succs = plan.getSuccessors(filter2);
+        assert(succs==null);
+    }
+    
+    @Test
+    public void testRemove4() throws FrontendException {
+        // has multiple inputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator join1 = new SillyOperator("join1", plan);
+        Operator fake1 = new SillyOperator("fake1", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(filter1);
+        plan.add(join1);
+        plan.connect(load1, join1);
+        plan.connect(load2, filter1);
+        plan.connect(filter1, join1);
+        plan.connect(join1, fake1);
+        
+        plan.removeAndReconnect(join1);
+        
+        List<Operator> preds = plan.getPredecessors(fake1);
+        assert(preds.size()==2);
+        assert(preds.contains(load1));
+        assert(preds.contains(filter1));        
+    }
+    
+    @Test
+    public void testRemove5() throws FrontendException {
+        // has multiple outputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator split1 = new SillyOperator("split1", plan);
+        Operator split2 = new SillyOperator("split2", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(split1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, split1);
+        plan.connect(split1, split2);
+        plan.connect(split2, filter1);
+        plan.connect(split2, filter2);
+        
+        plan.removeAndReconnect(split2);
+        
+        List<Operator> succs = plan.getSuccessors(split1);
+        assert(succs.size()==2);
+        assert(succs.contains(filter1));
+        assert(succs.contains(filter2));
+    }
+    
+    @Test
+    public void testRemove6() throws FrontendException {
+        // has multiple inputs/outputs
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator load2 = new SillyOperator("load2", plan);
+        Operator fake1 = new SillyOperator("fake1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.add(load1);
+        plan.add(load2);
+        plan.add(fake1);
+        plan.add(filter1);
+        plan.add(filter2);
+        plan.connect(load1, fake1);
+        plan.connect(load2, fake1);
+        plan.connect(fake1, filter1);
+        plan.connect(fake1, filter2);
+        
+        try {
+            plan.removeAndReconnect(fake1);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==2256);
+        }
+    }
+    
+    @Test
+    public void testInsertBetween1() throws FrontendException {
+        // single input
+        SillyPlan plan = new SillyPlan();
+        Operator load1 = new SillyOperator("load1", plan);
+        Operator filter1 = new SillyOperator("filter1", plan);
+        plan.add(load1);
+        plan.add(filter1);
+        plan.connect(load1, filter1);
+        
+        Operator filter2 = new SillyOperator("filter2", plan);
+        plan.insertBetween(load1, filter2, filter1);
+        
+        List<Operator> succs = plan.getSuccessors(filter2);
+        assert(succs.size()==1);
+        assert(succs.contains(filter1));
+        
+        List<Operator> preds = plan.getPredecessors(filter2);
+        assert(preds.size()==1);
+        assert(preds.contains(load1));
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java?rev=1001523&r1=1001522&r2=1001523&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java 
Sun Sep 26 21:22:57 2010
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,6 +30,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -38,6 +40,7 @@ import org.apache.pig.FilterFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
 import org.junit.After;
@@ -1872,5 +1875,44 @@ public class TestPruneColumn extends Tes
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: 
$1"}));
     }
+    
+    // See PIG-1644
+    @Test
+    public void testSplitOutputWithForEach() throws Exception {
+        Path output1 = 
FileLocalizer.getTemporaryPath(pigServer.getPigContext());
+        Path output2 = 
FileLocalizer.getTemporaryPath(pigServer.getPigContext());
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = load '"+ 
Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, 
a1, a2, a3);");
+        pigServer.registerQuery("B = foreach A generate a0, a1, a2;");
+        pigServer.registerQuery("store B into '" + 
Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("C = order B by a2;");
+        pigServer.registerQuery("D = foreach C generate a2;");
+        pigServer.registerQuery("store D into '" + 
Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
+        pigServer.executeBatch();
+
+        BufferedReader reader1 = new BufferedReader(new 
InputStreamReader(FileLocalizer.openDFSFile(output1.toString())));
+        String line = reader1.readLine();
+        assertTrue(line.equals("1\t2\t3"));
+        
+        line = reader1.readLine();
+        assertTrue(line.equals("2\t3\t4"));
+        
+        assertTrue(reader1.readLine()==null);
+        
+        BufferedReader reader2 = new BufferedReader(new 
InputStreamReader(FileLocalizer.openDFSFile(output2.toString())));
+        line = reader2.readLine();
+        assertTrue(line.equals("3"));
+        
+        line = reader2.readLine();
+        assertTrue(line.equals("4"));
+        
+        assertTrue(reader2.readLine()==null);
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: 
$3"}));
+        
+        reader1.close();
+        reader2.close();
+    }
+
 
 }


Reply via email to