Author: sms
Date: Mon May 18 21:39:09 2009
New Revision: 776106

URL: http://svn.apache.org/viewvc?rev=776106&view=rev
Log:
PIG-697: Proposed improvements to pig's optimizer

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestProjectionMap.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May 18 21:39:09 2009
@@ -28,7 +28,7 @@
          so that hadoop runs maps and creates output for the next
                 job (gates).
 
-PIG-693: Proposed improvements to pig's optimizer (sms)
+PIG-697: Proposed improvements to pig's optimizer (sms)
 
 PIG-700: To automate the pig patch test process (gkesavan via sms)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Mon 
May 18 21:39:09 2009
@@ -29,6 +29,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -37,6 +38,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 
 public class LOCogroup extends LogicalOperator {
     private static final long serialVersionUID = 2L;
@@ -511,5 +513,103 @@
         
         return cogroupClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        //the column with the alias 'group' can be mapped in several ways
+        //1. group A by $0;
+        //Here the mapping is 0 -> (0, 0)
+        //2. group A by ($0, $1);
+        //Here there is no direct mapping and 'group' is an added column
+        //3. cogroup A by $0, B by $0;
+        //Here the mapping is 0 -> ((0, 0), (1, 0))
+        //4. cogroup A by ($0, $1), B by ($0, $1);
+        //Here there is no direct mapping and 'group' is an added column
+        //For anything other than a simple project 'group' is an added column
+        
+        MultiMap<LogicalOperator, LogicalPlan> groupByPlans = 
getGroupByPlans();
+        
+        boolean groupByAdded = false;
+        MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+        List<Pair<Integer, Integer>> removedFields = new 
ArrayList<Pair<Integer, Integer>>();
+        
+        for(int inputNum = 0; (inputNum < predecessors.size()) && 
(!groupByAdded); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+
+            
+            List<LogicalPlan> predecessorPlans = (ArrayList<LogicalPlan>) 
groupByPlans.get(predecessor);
+
+            int inputColumn = -1;
+            for(LogicalPlan predecessorPlan: predecessorPlans) {               
 
+                List<LogicalOperator> leaves = predecessorPlan.getLeaves();
+                if(leaves == null || leaves.size() > 1) {
+                    groupByAdded = true;
+                    break;
+                }
+                
+                if(leaves.get(0) instanceof LOProject) {
+                    //find out if this project is a chain of projects
+                    if(LogicalPlan.chainOfProjects(predecessorPlan)) {
+                        LOProject rootProject = 
(LOProject)predecessorPlan.getRoots().get(0);
+                        inputColumn = rootProject.getCol();
+                        mapFields.put(0, new Pair<Integer, Integer>(inputNum, 
inputColumn));
+                    }
+                } else {
+                    groupByAdded = true;
+                }                
+            }
+            
+            Schema inputSchema;            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema != null) {
+                for(int column = 0; column < inputSchema.size(); ++column) {
+                    if(!groupByAdded && inputColumn != column) {
+                        removedFields.add(new Pair<Integer, Integer>(inputNum, 
column));
+                    }
+                }
+            }
+
+        }
+
+        List<Integer> addedFields = new ArrayList<Integer>();
+
+        if(groupByAdded) {
+            addedFields.add(0); //for the column 'group'
+            mapFields = null; //since 'group' is an added column there is no 
mapping            
+        }
+        
+        //the columns 1 through n - 1 are generated by cogroup
+        for(int i = 0; i < groupByPlans.keySet().size(); ++i) {
+            addedFields.add(i+ 1);
+        }
+        
+        if(removedFields.size() == 0) {
+            removedFields = null;
+        }
+
+        return new ProjectionMap(mapFields, removedFields, addedFields);
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java Mon May 
18 21:39:09 2009
@@ -32,7 +32,10 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -183,5 +186,73 @@
     public byte getType() {
         return DataType.BAG ;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+        List<Integer> addedFields = new ArrayList<Integer>();
+        boolean[] unknownSchema = new boolean[predecessors.size()];
+        boolean anyUnknownInputSchema = false;
+        int outputColumnNum = 0;
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema == null) {
+                unknownSchema[inputNum] = true;
+                outputColumnNum++;
+                addedFields.add(inputNum);
+                anyUnknownInputSchema = true;
+            } else {
+                unknownSchema[inputNum] = false;
+                for(int inputColumn = 0; inputColumn < inputSchema.size(); 
++inputColumn) {
+                    mapFields.put(outputColumnNum++, new Pair<Integer, 
Integer>(inputNum, inputColumn));
+                }
+            }
+        }
+        
+        //TODO
+        /*
+         * For now, if there is any input that has an unknown schema
+         * flag it and return a null ProjectionMap.
+         * In the future, when unknown schemas are handled
+         * mark inputs that have unknown schemas as output columns
+         * that have been added.
+         */
+
+        if(anyUnknownInputSchema) {
+            return null;
+        }
+        
+        if(addedFields.size() == 0) {
+            addedFields = null;
+        }
+
+        return new ProjectionMap(mapFields, null, addedFields);
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Mon 
May 18 21:39:09 2009
@@ -26,6 +26,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
@@ -116,5 +117,44 @@
         LODistinct distinctClone = (LODistinct)super.clone();
         return distinctClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a distinct have to match!
+            return null;
+        }
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Mon May 
18 21:39:09 2009
@@ -31,8 +31,10 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * This is the logical operator for the Fragment Replicate Join
@@ -268,6 +270,73 @@
 
         return new Schema(fsList) ;
     }
-    
+
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+        List<Integer> addedFields = new ArrayList<Integer>();
+        boolean[] unknownSchema = new boolean[predecessors.size()];
+        boolean anyUnknownInputSchema = false;
+        int outputColumnNum = 0;
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema == null) {
+                unknownSchema[inputNum] = true;
+                outputColumnNum++;
+                addedFields.add(inputNum);
+                anyUnknownInputSchema = true;
+            } else {
+                unknownSchema[inputNum] = false;
+                for(int inputColumn = 0; inputColumn < inputSchema.size(); 
++inputColumn) {
+                    mapFields.put(outputColumnNum++, new Pair<Integer, 
Integer>(inputNum, inputColumn));
+                }
+            }
+        }
+        
+        //TODO
+        /*
+         * For now, if there is any input that has an unknown schema
+         * flag it and return a null ProjectionMap.
+         * In the future, when unknown schemas are handled
+         * mark inputs that have unknown schemas as output columns
+         * that have been added.
+         */
+
+        if(anyUnknownInputSchema) {
+            return null;
+        }
+        
+        if(addedFields.size() == 0) {
+            addedFields = null;
+        }
+
+        return new ProjectionMap(mapFields, null, addedFields);
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Mon May 
18 21:39:09 2009
@@ -23,6 +23,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
@@ -124,5 +125,44 @@
         filterClone.mComparisonPlan = lpCloner.getClonedPlan();
         return filterClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a filter have to match!
+            return null;
+        }
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon 
May 18 21:39:09 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
@@ -30,7 +31,10 @@
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.data.DataType;
@@ -174,7 +178,7 @@
                     //In the above script, the generate a1, will translate to 
                     //project(a1) -> project(*) and will not be translated to 
a sequence of projects
                     //As a result the project(*) will remain but the return 
type is a bag
-                    //project*) with a data type set to tuple indicataes a 
project(*) from an input
+                    //project(*) with a data type set to tuple indicates a 
project(*) from an input
                     //that has no schema
                     if( (((LOProject)op).isStar() ) && 
(((LOProject)op).getType() == DataType.TUPLE) ) {
                         mSchema = null;
@@ -447,4 +451,197 @@
         return forEachClone;
     }
 
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        LogicalOperator predecessor = predecessors.get(0);
+        
+        Schema inputSchema;
+        
+        try {
+            inputSchema = predecessor.getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        List<LogicalPlan> foreachPlans = getForEachPlans();
+        List<Boolean> flattenList = getFlatten();
+        
+        MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+        List<Integer> addedFields = new ArrayList<Integer>();
+        int outputColumn = 0;
+        
+        for(int i = 0; i < foreachPlans.size(); ++i) {
+            LogicalPlan foreachPlan = foreachPlans.get(i);
+            List<LogicalOperator> leaves = foreachPlan.getLeaves();
+            if(leaves == null || leaves.size() > 1) {
+                return null;
+            }
+            
+            int inputColumn = -1;
+            boolean mapped = false;
+            
+            if(leaves.get(0) instanceof LOProject) {
+                //find out if this project is a chain of projects
+                if(LogicalPlan.chainOfProjects(foreachPlan)) {
+                    LOProject rootProject = 
(LOProject)foreachPlan.getRoots().get(0);
+                    inputColumn = rootProject.getCol();
+                    if(inputSchema != null) {
+                        mapped = true;
+                    }
+                }
+            }
+            
+            Schema.FieldSchema leafFS;
+            try {
+                leafFS = ((ExpressionOperator)leaves.get(0)).getFieldSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(leafFS == null) {
+                return null;
+            }
+            
+            if(flattenList.get(i)) {
+                Schema innerSchema = leafFS.schema;
+                
+                if(innerSchema != null) {                    
+                    if(innerSchema.isTwoLevelAccessRequired()) {
+                        // this is the case where the schema is that of
+                        // a bag which has just one tuple fieldschema which
+                        // in turn has a list of fieldschemas. The schema
+                        // after flattening would consist of the fieldSchemas
+                        // present in the tuple
+                        
+                        // check that indeed we only have one field schema
+                        // which is that of a tuple
+                        if(innerSchema.getFields().size() != 1) {
+                            return null;
+                        }
+                        Schema.FieldSchema tupleFS;
+                        try {
+                            tupleFS = innerSchema.getField(0);
+                        } catch (FrontendException fee) {
+                            return null;
+                        }
+                        
+                        if(tupleFS.type != DataType.TUPLE) {
+                            return null;
+                        }
+                        innerSchema = tupleFS.schema;
+                    }
+                    
+                    //innerSchema could be modified and hence the second check
+                    if(innerSchema != null) {
+                        for(int j = 0; j < innerSchema.size(); ++j) {
+                            if(mapped) {
+                                //map each flattened column to the original 
column
+                                mapFields.put(outputColumn++, new 
Pair<Integer, Integer>(0, inputColumn));
+                            } else {
+                                addedFields.add(outputColumn++);
+                            }
+                        }
+                    } else {
+                        //innerSchema is null; check for schema type
+                        if(DataType.isSchemaType(leafFS.type)) {
+                            //flattening a null schema results in a bytearray
+                            if(mapped) {
+                                //map each flattened column to the original 
column
+                                mapFields.put(outputColumn++, new 
Pair<Integer, Integer>(0, inputColumn));
+                            } else {
+                                addedFields.add(outputColumn++);
+                            }
+                        } else {
+                            mapFields.put(outputColumn++, new Pair<Integer, 
Integer>(0, inputColumn));
+                        }
+                    }
+                } else {
+                    //innerSchema is null; check for schema type
+                    if(DataType.isSchemaType(leafFS.type)) {
+                        //flattening a null schema results in a bytearray
+                        if(mapped) {
+                            //map each flattened column to the original column
+                            mapFields.put(outputColumn++, new Pair<Integer, 
Integer>(0, inputColumn));
+                        } else {
+                            addedFields.add(outputColumn++);
+                        }
+                    } else {
+                        mapFields.put(outputColumn++, new Pair<Integer, 
Integer>(0, inputColumn));
+                    }
+                }
+            } else {
+                //not a flattened column
+                if(mapped) {
+                    mapFields.put(outputColumn++, new Pair<Integer, 
Integer>(0, inputColumn));
+                } else {
+                    addedFields.add(outputColumn++);
+                }
+            }
+        }
+        
+        List<Pair<Integer, Integer>> removedFields = new 
ArrayList<Pair<Integer, Integer>>();
+       
+        if(inputSchema == null) {
+            //if input schema is null then there are no mappedFields and 
removedFields
+            mapFields = null;
+            removedFields = null;
+        } else {
+            
+            //if the size of the map is zero then set it to null
+            if(mapFields.size() == 0) {
+                mapFields = null;
+            }
+            
+            if(addedFields.size() == 0) {
+                addedFields = null;
+            }
+            
+            //input schema is not null. Need to compute the removedFields
+            //compute the set difference between the input schema and mapped 
fields
+            
+            Set<Integer> removedSet = new HashSet<Integer>();
+            for(int i = 0; i < inputSchema.size(); ++i) {
+                removedSet.add(i);
+            }
+            
+            if(mapFields != null) {
+                Set<Integer> mappedSet = new HashSet<Integer>();
+                for(Integer key: mapFields.keySet()) {
+                    List<Pair<Integer, Integer>> values = 
(ArrayList<Pair<Integer, Integer>>)mapFields.get(key);
+                    for(Pair<Integer, Integer> value: values) {
+                        mappedSet.add(value.second);
+                    }
+                }
+                removedSet.removeAll(mappedSet);
+            }
+            
+            if(removedSet.size() == 0) {
+                removedFields = null;
+            } else {
+                for(Integer i: removedSet) {
+                    removedFields.add(new Pair<Integer, Integer>(0, i));
+                }
+            }
+        }
+
+        return new ProjectionMap(mapFields, removedFields, addedFields);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Mon May 
18 21:39:09 2009
@@ -19,6 +19,10 @@
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
@@ -28,7 +32,10 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -130,9 +137,6 @@
         if (!mIsSchemaComputed) {
             // get the schema of the load function
             try {
-                //DEBUG
-                //System.out.println("Schema file: " + mSchema);
-                
                 if (mEnforcedSchema != null) {
                     mSchema = mEnforcedSchema ;
                     return mSchema ;
@@ -220,5 +224,53 @@
     public Schema getDeterminedSchema() {
         return mDeterminedSchema;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            try {
+                inputSchema = mLoadFunc.determineSchema(mSchemaFile, 
mExecType, mStorage);
+            } catch (IOException ioe) {
+                return null;
+            }
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+            //compute the mapping assuming its a prefix projection
+            for(int i = 0; i < inputSchema.size(); ++i) {
+                mapFields.put(i, new Pair<Integer, Integer>(0, i));
+            }
+            return new ProjectionMap(mapFields, null, null);
+        }
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Mon May 
18 21:39:09 2009
@@ -27,6 +27,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.data.DataType;
@@ -202,5 +203,44 @@
         }
         return clone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a sort have to match!
+            return null;
+        }
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Mon May 
18 21:39:09 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -27,9 +28,11 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -126,4 +129,41 @@
     public FileSpec getInputSpec() {
         return mInputSpec;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a store have to match!
+            return null;
+        }
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Mon May 
18 21:39:09 2009
@@ -18,15 +18,20 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,7 +74,8 @@
                     if(null != mSchema) {
                         mSchema = mSchema.merge(op.getSchema(), false);
                     } else {
-                        mSchema = op.getSchema();
+                        mSchema = null;
+                        break;
                     }
                 }
                 if(null != mSchema) {
@@ -78,7 +84,7 @@
                         while(iter.hasNext()) {
                             op = iter.next();
                             Schema opSchema = op.getSchema();
-                            if(null != s) {
+                            if(null != opSchema) {
                                 for(Schema.FieldSchema opFs: 
opSchema.getFields()) {
                                     fs.setParent(opFs.canonicalName, op);
                                 }
@@ -127,5 +133,49 @@
         LOUnion unionClone = (LOUnion)super.clone();
         return unionClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        MultiMap<Integer, Pair<Integer, Integer>> mapFields = new 
MultiMap<Integer, Pair<Integer, Integer>>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema == null) {
+                return null;
+            } else {
+                for(int inputColumn = 0; inputColumn < inputSchema.size(); 
++inputColumn) {
+                    mapFields.put(inputColumn, new Pair<Integer, 
Integer>(inputNum, inputColumn));
+                    //removedFields.add(new Pair<Integer, Integer>(inputNum, 
inputColumn));
+                }
+            }
+        }
+        
+        return new ProjectionMap(mapFields, null, null);
+    }
 
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
Mon May 18 21:39:09 2009
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.io.IOException;
 
 import org.apache.pig.data.DataType;
@@ -27,7 +28,9 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -69,12 +72,6 @@
      */
     protected LogicalPlan mPlan;
 
-    /**
-     * A boolean variable to remember if input has to be flattened Used only in
-     * the context of generate
-     */
-    //private boolean mIsFlatten = false;
-    
     private static Log log = LogFactory.getLog(LogicalOperator.class);
 
     /**
@@ -128,11 +125,8 @@
             // It's fine, it just means we don't have a schema yet.
         }
         if (mSchema == null) {
-            log.debug("Operator schema is null; Setting it to new schema");
             mSchema = schema;
         } else {
-            log.debug("Reconciling schema");
-            log.debug("mSchema: " + mSchema + " schema: " + schema);
             mSchema.reconcile(schema);
         }
     }
@@ -284,6 +278,29 @@
         if(mSchema != null)
             loClone.mSchema = this.mSchema.clone();
         return loClone;
+    }    
+
+
+    /**
+     * Produce a map describing how this operator modifies its projection.
+     * @return ProjectionMap null indicates it does not know how the projection
+     * changes, for example a join of two inputs where one input does not have
+     * a schema.
+     */
+    public ProjectionMap getProjectionMap() {
+        return null;
+    };
+
+    /**
+     * Get a list of fields that this operator requires.  This is not 
necessarily
+     * equivalent to the list of fields the operator projects.  For example,
+     * a filter will project anything passed to it, but requires only the 
fields
+     * explicitly referenced in its filter expression.
+     * @return list of fields, numbered from 0.
+     */
+    public List<Pair<Integer, Integer>> getRequiredFields()
+    {
+        return null;
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Mon 
May 18 21:39:09 2009
@@ -150,4 +150,52 @@
         return clone;
     }
     
+    /**
+     * A utility method to check if a plan contains a chain of projection
+     * operators
+     * 
+     * @param plan
+     *            input plan
+     * @return true if there is a chain of projection operators; false 
otherwise
+     */
+    public static boolean chainOfProjects(LogicalPlan plan) {
+        
+        if (plan == null) {
+            return false;
+        }
+        
+        List<LogicalOperator> leaves = plan.getLeaves();
+
+        if (leaves == null) {
+            return false;
+        }
+
+        if (leaves.size() > 1) {
+            return false;
+        }
+
+        LogicalOperator node = leaves.get(0);
+
+        while (true) {
+            if ((node == null) || !(node instanceof LOProject)) {
+                //not a projection operator
+                return false;
+            }
+
+            List<LogicalOperator> predecessors = plan.getPredecessors(node);
+
+            if (predecessors == null) {
+                //we have reached the root
+                return true;
+            }
+
+            if (predecessors.size() > 1) {
+                //a project cannot have multiple inputs
+                return false;
+            }
+
+            node = predecessors.get(0);
+        }
+    }
+    
 }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java?rev=776106&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java Mon May 18 
21:39:09 2009
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan;
+
+import java.io.Serializable;
+import java.lang.StringBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A struct detailing how a projection is altered by an operator.
+ */
+public class ProjectionMap {
+    /**
+     * Quick way for an operator to note that its input and output are the 
same.
+     */
+    private boolean mChanges = true;
+
+    /**
+     * Map of field changes, with keys being the output fields of the operator
+     * and values being the input fields. Fields are numbered from 0. So for a
+     * foreach operator derived from 'B = foreach A generate $0, $2, $3,
+     * udf($1)' would produce a mapping of 0->(0, 0), 2->(0, 1), 3->(0, 2)
+     */
+    private MultiMap<Integer, Pair<Integer, Integer>> mMappedFields;
+
+    /**
+     * List of fields removed from the input. This includes fields that were
+     * transformed, and thus are no longer the same fields. Using the example
+     * foreach given under mappedFields, this list would contain '(0,1)'.
+     */
+    private List<Pair<Integer, Integer>> mRemovedFields;
+
+    /**
+     * List of fields in the output of this operator that were created by this
+     * operator. Using the example foreach given under mappedFields, this list
+     * would contain '3'.
+     */
+    private List<Integer> mAddedFields;
+
+    /**
+     * 
+     * @param changes
+     *            to indicate if this projection map changes its input or not
+     */
+    public ProjectionMap(boolean changes) {
+        this(null, null, null, changes);
+    }
+
+    /**
+     * 
+     * @param mapFields
+     *            the mapping of input column to output column
+     * @param removedFields
+     *            the list of input columns that are removed
+     * @param addedFields
+     *            the list of columns that are added to the output
+     */
+    public ProjectionMap(MultiMap<Integer, Pair<Integer, Integer>> mapFields,
+            List<Pair<Integer, Integer>> removedFields,
+            List<Integer> addedFields) {
+        this(mapFields, removedFields, addedFields, true);
+    }
+
+    /**
+     * 
+     * @param mapFields
+     *            the mapping of input column to output column
+     * @param removedFields
+     *            the list of input columns that are removed
+     * @param addedFields
+     *            the list of columns that are added to the output
+     * @param changes
+     *            to indicate if this projection map changes its input or not
+     */
+    private ProjectionMap(MultiMap<Integer, Pair<Integer, Integer>> mapFields,
+            List<Pair<Integer, Integer>> removedFields,
+            List<Integer> addedFields, boolean changes) {
+        mMappedFields = mapFields;
+        mAddedFields = addedFields;
+        mRemovedFields = removedFields;
+        mChanges = changes;
+    }
+
+    /**
+     * 
+     * @return the mapping of input column to output column
+     */
+    public MultiMap<Integer, Pair<Integer, Integer>> getMappedFileds() {
+        return mMappedFields;
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the mapping of input column to output column
+     */
+    public void setMappedFileds(MultiMap<Integer, Pair<Integer, Integer>> 
fields) {
+        mMappedFields = fields;
+    }
+
+    /**
+     * 
+     * @return the list of input columns that are removed
+     */
+    public List<Pair<Integer, Integer>> getRemovedFileds() {
+        return mRemovedFields;
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the list of input columns that are removed
+     */
+    public void setRemovedFileds(List<Pair<Integer, Integer>> fields) {
+        mRemovedFields = fields;
+    }
+
+    /**
+     * 
+     * @return the list of columns that are added to the output
+     */
+    public List<Integer> getAddedFileds() {
+        return mAddedFields;
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the list of columns that are added to the output
+     */
+    public void setAddedFileds(List<Integer> fields) {
+        mAddedFields = fields;
+    }
+
+    /**
+     * 
+     * @return if this projection map changes its input or not
+     */
+    public boolean changes() {
+        return getChanges();
+    }
+
+
+    /**
+     * 
+     * @return if this projection map changes its input or not
+     */
+    public boolean getChanges() {
+        return mChanges;
+    }
+
+    /**
+     * 
+     * @param changes
+     *            if this projection map changes its input or not
+     */
+    public void setChanges(boolean changes) {
+        mChanges = changes;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("changes: " + mChanges);
+        sb.append(" mapped fields: " + mMappedFields);
+        sb.append(" added fields: " + mAddedFields);
+        sb.append(" removed fields: " + mRemovedFields);
+        return sb.toString();
+    }
+}
\ No newline at end of file


Reply via email to