Author: daijy
Date: Fri Dec 25 00:28:09 2009
New Revision: 893827

URL: http://svn.apache.org/viewvc?rev=893827&view=rev
Log:
PIG-1146: Inconsistent column pruning in LOUnion

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Dec 25 00:28:09 2009
@@ -126,6 +126,8 @@
 
 PIG-1086: Nested sort by * throw exception (rding via daijy)
 
+PIG-1146: Inconsistent column pruning in LOUnion (daijy)
+
 Release 0.6.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Fri 
Dec 25 00:28:09 2009
@@ -23,13 +23,10 @@
 import java.util.Map;
 
 import org.apache.pig.PigException;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.ProjectionMap.Column;
-import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 
@@ -37,14 +34,20 @@
     private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap;
     LogicalPlan plan;
     
-    public ColumnPruner(LogicalPlan plan, LogicalOperator op, 
List<Pair<Integer, Integer>> prunedColumns, 
-            PlanWalker<LogicalOperator, LogicalPlan> walker) {
-        super(plan, walker);
+    public ColumnPruner(LogicalPlan plan) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(plan));
         prunedColumnsMap = new HashMap<LogicalOperator, 
List<Pair<Integer,Integer>>>();
-        prunedColumnsMap.put(op, prunedColumns);
         this.plan = plan;
     }
 
+    public void addPruneMap(LogicalOperator op, List<Pair<Integer,Integer>> 
prunedColumns) {
+        prunedColumnsMap.put(op, prunedColumns);
+    }
+    
+    public boolean isEmpty() {
+        return prunedColumnsMap.isEmpty();
+    }
+    
     protected void prune(RelationalOperator lOp) throws VisitorException {
         List<LogicalOperator> predecessors = plan.getPredecessors(lOp);
         if (predecessors==null)
@@ -79,7 +82,7 @@
             }
             
             // For every input column, check if it is pruned
-            nextOutput:for (int i=0;i<lOp.getSchema().size();i++)
+            for (int i=0;i<lOp.getSchema().size();i++)
             {
                 List<RequiredFields> relevantFieldsList = 
lOp.getRelevantInputs(0, i);
                 
@@ -101,125 +104,73 @@
                 if (needNoInputs)
                     continue;
                 
-                boolean allPruned = true;
+                boolean columnPruned = false;
                 
-                // For LOUnion, we treat it differently. LOUnion is the only 
operator that cannot be pruned independently.
-                // For every pruned input column, we will prune. LOUnion 
(Contrary to other operators, unless all relevant
-                // fields are pruned, we then prune the output field. Inside 
LOUnion, we have a counter, the output columns 
-                // is actually pruned only after all corresponding input 
columns have been pruned
-                if (lOp instanceof LOUnion)
-                {
-                    allPruned = false;
-                    checkAllPrunedUnion: for (RequiredFields relevantFields: 
relevantFieldsList)
-                    {
-                        for (Pair<Integer, Integer> relevantField: 
relevantFields.getFields())
-                        {
-                            if (columnsPruned.contains(relevantField))
-                            {
-                                allPruned = true;
-                                break checkAllPrunedUnion;
-                            }
-                        }
-                    }
-                }
                 // For LOCogroup, one output can be pruned if all its relevant 
input are pruned except for "key" fields 
-                else if (lOp instanceof LOCogroup)
+                if (lOp instanceof LOCogroup)
                 {
                     List<RequiredFields> requiredFieldsList = 
lOp.getRequiredFields();
-                    boolean sawInputPruned = false;
                     for (Pair<Integer, Integer> column : columnsPruned)
                     {
                         if (column.first == i-1)  // Saw at least one input 
pruned
                         {
-                            sawInputPruned = true;
-                            // Further check if requiredFields of the 
LOCogroup contains these fields.
-                            // If not, we can safely prune this output column
                             if 
(requiredFieldsList.get(i-1).getFields().contains(column))
                             {
-                                allPruned = false;
+                                columnPruned = true;
                                 break;
                             }
                         }
                     }
-                    if (!sawInputPruned)
-                        allPruned = false;
                 }
                 else
                 {
-                    nextRelevantFields:for (RequiredFields relevantFields: 
relevantFieldsList)
+                    // If we see any of the relevant field of this column get 
pruned, 
+                    // then we prune this column for this operator
+                    for (RequiredFields relevantFields: relevantFieldsList)
                     {
-                        if (relevantFields==null)
+                        if (relevantFields == null)
                             continue;
-                        
-                        if (relevantFields.needAllFields())
-                        {
-                            allPruned = false;
+                        if (relevantFields.getNeedAllFields())
                             break;
-                        }
-                        if (relevantFields.needNoFields())
-                            continue;
                         for (Pair<Integer, Integer> relevantField: 
relevantFields.getFields())
                         {
-                            if (relevantField==null)
-                                continue;
-                            
-                            if (lOp instanceof LOUnion)
+                            if (columnsPruned.contains(relevantField))
                             {
-                                if (columnsPruned.contains(relevantField))
-                                    break nextRelevantFields;
+                                columnPruned = true;
                             }
-                            else if (!columnsPruned.contains(relevantField))
-                            {
-                                allPruned = false;
-                                break nextRelevantFields;
+                            else {
+                                // For union, inconsistent pruning is possible 
(See PIG-1146)
+                                // We shall allow inconsistent pruning for 
union, and the pruneColumns method
+                                // in LOUnion will handle this inconsistency
+                                if (!(lOp instanceof LOUnion) && 
columnPruned==true) {
+                                    int errCode = 2185;
+                                    String msg = "Column $"+i+" of "+lOp+" 
inconsistent pruning";
+                                    throw new OptimizerException(msg, errCode, 
PigException.BUG);
+                                }
                             }
                         }
                     }
                 }
-                if (allPruned)
+                if (columnPruned)
                     columnsToPrune.add(new Pair<Integer, Integer>(0, i));
             }
     
-            if (columnsPruned.size()!=0)
+            LogicalOperator currentOp = lOp;
+            
+            // If it is LOCogroup, insert foreach to mimic pruning, because we 
have no way to prune
+            // LOCogroup output only by pruning the inputs
+            if (columnsPruned.size()!=0 && lOp instanceof LOCogroup)
             {
-                MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, 
Column>();
-                List<Column> columns = new ArrayList<Column>();
-                columns.add(new Column(new Pair<Integer, Integer>(0, 0)));
-                mappedFields.put(0, columns);
-                LogicalOperator nextOp = lOp;
-                if (lOp instanceof LOCogroup)
-                {
-                    ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-                    ArrayList<LogicalPlan> generatingPlans = new 
ArrayList<LogicalPlan>();
-                    String scope = lOp.getOperatorKey().scope;
-                    for (int i=0;i<=predecessors.size();i++) {
-                        if (!columnsToPrune.contains(new Pair<Integer, 
Integer>(0, i)))
-                        {
-                            LogicalPlan projectPlan = new LogicalPlan();
-                            LogicalOperator projectInput = lOp;
-                            ExpressionOperator column = new 
LOProject(projectPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)), projectInput, i);
-                            flattenList.add(false);
-                            projectPlan.add(column);
-                            generatingPlans.add(projectPlan);
-                        }
-                        columns = new ArrayList<Column>();
-                        columns.add(new Column(new Pair<Integer, Integer>(0, 
i+1)));
-                        mappedFields.put(i+1, columns);
-                    }
-                    LOForEach forEach = new LOForEach(mPlan, new 
OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), 
generatingPlans, flattenList);
-                    LogicalOperator succ = mPlan.getSuccessors(lOp).get(0);
-                    mPlan.add(forEach);
-                    // Since the successor has not been pruned yet, so we 
cannot rewire directly because
-                    // rewire has the assumption that predecessor and 
successor is in consistent
-                    // state. The way we do the rewire is kind of hacky. We 
give a fake projection map in the 
-                    // new node to fool rewire
-                    mPlan.doInsertBetween(lOp, forEach, succ, false);
-                    forEach.getProjectionMap().setMappedFields(mappedFields);
-                    succ.rewire(lOp, 0, forEach, false);
-                    nextOp = forEach;
-                }
-                if (lOp.pruneColumns(columnsPruned))
-                    prunedColumnsMap.put(nextOp, columnsToPrune);
+                List<Integer> columnsToProject = new ArrayList<Integer>();
+                for (int i=0;i<=predecessors.size();i++) {
+                    if (!columnsToPrune.contains(new Pair<Integer, Integer>(0, 
i)))
+                        columnsToProject.add(i);
+                }                
+                currentOp = lOp.insertPlainForEachAfter(columnsToProject);
+            }
+            
+            if (lOp.pruneColumns(columnsPruned)) {
+                prunedColumnsMap.put(currentOp, columnsToPrune);
             }
         } catch (FrontendException e) {
             int errCode = 2188;
@@ -244,7 +195,11 @@
     }
     
     protected void visit(LOForEach foreach) throws VisitorException {
-        prune(foreach);
+        // The only case we should skip foreach is when this is the foreach
+        // inserted after LOLoad to mimic pruning, then we put the 
prunedColumns entry
+        // for that foreach, and we do not need to further visit this foreach 
here
+        if (!prunedColumnsMap.containsKey(foreach))
+            prune(foreach);
     }
     
     protected void visit(LOJoin join) throws VisitorException {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Fri 
Dec 25 00:28:09 2009
@@ -863,7 +863,7 @@
         
         return result;
     }
-     @Override
+    @Override
     public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
             throws FrontendException {
         if (!mIsSchemaComputed)

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Fri Dec 
25 00:28:09 2009
@@ -42,8 +42,6 @@
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LOUnion.class);
     
-    List<Pair<Integer, Integer>> stagingPrunedColumns = new 
ArrayList<Pair<Integer, Integer>>(); 
-
     /**
      * @param plan
      *            Logical plan this operator is a part of.
@@ -240,27 +238,60 @@
             result.add(new RequiredFields(inputList));
         }
         
-        
         return result;
     }
-
+    @Override
     public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
         throws FrontendException {
-        stagingPrunedColumns.addAll(columns);
-        boolean allPruned = true;
+        if (!mIsSchemaComputed)
+            getSchema();
+        if (mSchema == null) {
+            log.warn("Cannot prune columns in union, no schema information 
found");
+            return false;
+        }
+
+        // Find maximum pruning among all inputs
+        boolean[] maximumPruned = new boolean[mSchema.size()];
         for (Pair<Integer, Integer>pair : columns)
         {
-            for (int i=0;i<mPlan.getPredecessors(this).size();i++)
+            maximumPruned[pair.second] = true;
+        }
+        int maximumNumPruned = 0;
+        for (int i=0;i<maximumPruned.length;i++) {
+            if (maximumPruned[i])
+                maximumNumPruned++;
+        }
+        
+        List<LogicalOperator> preds = getInputs();
+        for (int i=0;i<preds.size();i++) {
+            // Build a list of pruned columns for this predecessor
+            boolean[] actualPruned = new boolean[mSchema.size()];
+            for (Pair<Integer, Integer>pair : columns)
             {
-                if (!stagingPrunedColumns.contains(new Pair<Integer, 
Integer>(i, pair.second)))
-                    allPruned = false;
+                if (pair.first==i)
+                    actualPruned[pair.second] = true;
+            }
+            int actualNumPruned = 0;
+            for (int j=0;j<actualPruned.length;j++) {
+                if (actualPruned[j])
+                    actualNumPruned++;
+            }
+            if (actualNumPruned!=maximumNumPruned) { // We need to prune some 
columns before LOUnion
+                List<Integer> columnsToProject = new ArrayList<Integer>();
+                int index=0;
+                for (int j=0;j<actualPruned.length;j++) {
+                    if (!maximumPruned[j]) {
+                        columnsToProject.add(index); 
+                        index++;
+                    } else {
+                        if (!actualPruned[j])
+                            index++;
+                    }
+                }
+                
((RelationalOperator)preds.get(i)).insertPlainForEachAfter(columnsToProject);
             }
         }
-        if (allPruned)
-        {
-            super.pruneColumns(columns);
-            return true;
-        }
-        return false;
+        super.pruneColumns(columns);
+        return true;
     }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java 
Fri Dec 25 00:28:09 2009
@@ -17,13 +17,17 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.PigException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 
 public abstract class RelationalOperator extends LogicalOperator {
@@ -177,4 +181,33 @@
             }
         }
     }
+    
+    // insert a forEach after the operator. This forEach map columns in 
columnsToProject directly, and remove the rest
+    public LogicalOperator insertPlainForEachAfter(List<Integer> 
columnsToProject) throws FrontendException {
+        ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+        ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
+        String scope = getOperatorKey().scope;
+        for (int pos : columnsToProject) {
+            LogicalPlan projectPlan = new LogicalPlan();
+            ExpressionOperator column = new LOProject(projectPlan, new 
OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), this, 
pos);
+            flattenList.add(false);
+            projectPlan.add(column);
+            generatePlans.add(projectPlan);
+        }
+        LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, 
flattenList);
+        LogicalOperator succ = mPlan.getSuccessors(this).get(0);
+
+        MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, 
Column>();
+        List<Column> columns;
+        for (int i=0;i<=getSchema().size();i++) {
+            columns = new ArrayList<Column>();
+            columns.add(new Column(new Pair<Integer, Integer>(0, i)));
+            mappedFields.put(i, columns);
+        }
+        mPlan.add(forEach);
+        mPlan.doInsertBetween(this, forEach, succ, false);
+        forEach.getProjectionMap().setMappedFields(mappedFields);
+        succ.rewire(this, 0, forEach, false);
+        return forEach;
+    }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Fri Dec 25 00:28:09 2009
@@ -232,6 +232,7 @@
                         pruneRule.getTransformer().transform(match);
                     }
                 }
+                ((PruneColumns)pruneRule.getTransformer()).prune();
             }
         }
     }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Fri Dec 25 00:28:09 2009
@@ -53,7 +53,6 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.MapKeysInfo;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -77,9 +76,10 @@
 
     private static Log log = LogFactory.getLog(PruneColumns.class);
     Map<RelationalOperator, RequiredInfo> cachedRequiredInfo = new 
HashMap<RelationalOperator, RequiredInfo>();
-    
+    ColumnPruner pruner;
     public PruneColumns(LogicalPlan plan) {
         super(plan);
+        pruner = new ColumnPruner(plan);
     }
 
     @Override
@@ -510,7 +510,7 @@
                processNode(predecessors.get(i), new 
RequiredInfo(newRequiredOutputFieldsList));
             }
         } catch (FrontendException e) {
-            int errCode = 2185;
+            int errCode = 2211;
             String msg = "Unable to prune columns when processing node " + lo;
             throw new OptimizerException(msg, errCode, PigException.BUG, e);
         }
@@ -698,42 +698,14 @@
         }
         
         // Loader does not support column pruning, insert foreach
-        LOForEach forEach = null;
+        LogicalOperator forEach = null;
         if (response==null || !response.getRequiredFieldResponse())
         {
-            Set<Integer> columnsToProject = new TreeSet<Integer>();
+            List<Integer> columnsToProject = new ArrayList<Integer>();
             for (LoadFunc.RequiredField rf : requiredFieldList.getFields())
                 columnsToProject.add(rf.getIndex());
             
-            ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-            ArrayList<LogicalPlan> generatePlans = new 
ArrayList<LogicalPlan>();
-            String scope = load.getOperatorKey().scope;
-            for (int pos : columnsToProject) {
-                LogicalPlan projectPlan = new LogicalPlan();
-                LogicalOperator projectInput = load;
-                ExpressionOperator column = new LOProject(projectPlan, new 
OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), 
projectInput, pos);
-                flattenList.add(false);
-                projectPlan.add(column);
-                generatePlans.add(projectPlan);
-            }
-            forEach = new LOForEach(mPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, 
flattenList);
-            LogicalOperator pred = mPlan.getSuccessors(load).get(0);
-            /*mPlan.disconnect(load, pred);
-            mPlan.add(forEach);
-            mPlan.connect(load, forEach);
-            mPlan.connect(forEach, pred);
-            forEach.getSchema();*/
-            MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, 
Column>();
-            List<Column> columns;
-            for (int i=0;i<=load.getSchema().size();i++) {
-                columns = new ArrayList<Column>();
-                columns.add(new Column(new Pair<Integer, Integer>(0, i)));
-                mappedFields.put(i, columns);
-            }
-            mPlan.add(forEach);
-            mPlan.doInsertBetween(load, forEach, pred, false);
-            forEach.getProjectionMap().setMappedFields(mappedFields);
-            pred.rewire(load, 0, forEach, false);
+            forEach = load.insertPlainForEachAfter(columnsToProject);
         }
         
         // Begin to prune
@@ -750,16 +722,10 @@
         StringBuffer message = new StringBuffer();
         if (pruneList.size()!=0)
         {
-            
-            ColumnPruner columnPruner;
             if (forEach == null)
-                columnPruner = new ColumnPruner(mPlan, load, pruneList, 
-                    new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(mPlan));
+                pruner.addPruneMap(load, pruneList);
             else
-                columnPruner = new ColumnPruner(mPlan, forEach, pruneList, 
-                        new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(mPlan));
-            
-            columnPruner.visit();
+                pruner.addPruneMap(forEach, pruneList);
 
             message.append("Columns pruned for " + load.getAlias() + ": ");
             for (int i=0;i<pruneList.size();i++)
@@ -798,4 +764,16 @@
         else
             log.info("No map keys pruned for " + load.getAlias());
     }
+    
+    public void prune() throws OptimizerException {
+        try {
+            if (!pruner.isEmpty())
+                pruner.visit();
+        }
+        catch (FrontendException e) {
+            int errCode = 2212;
+            String msg = "Unable to prune plan";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=893827&r1=893826&r2=893827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Dec 25 
00:28:09 2009
@@ -1677,4 +1677,50 @@
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, 
$2", 
             "No map keys pruned for A", "No column pruned for B", "No map keys 
pruned for B"}));
     }
+    
+    // See PIG-1146
+    @Test
+    public void testUnionMixedPruning() throws Exception {
+        pigServer.registerQuery("A = load '"+ 
Util.generateURI(tmpFile1.toString()) + "' AS (a0, a1:chararray, a2);");
+        pigServer.registerQuery("B = load '"+ 
Util.generateURI(tmpFile2.toString()) + "' AS (b0, b2);");
+        pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
+        pigServer.registerQuery("D = union A, C;");
+        pigServer.registerQuery("E = foreach D generate $0, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3)");
+        results.add("(2,2)");
+        results.add("(1,1)");
+        results.add("(2,2)");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+            "No map keys pruned for A", "No column pruned for B",
+            "No map keys pruned for B"}));
+    }
 }


Reply via email to