Author: daijy
Date: Wed Sep  2 23:49:54 2009
New Revision: 810735

URL: http://svn.apache.org/viewvc?rev=810735&view=rev
Log:
PIG-922: Logical optimizer: push up project part 1

Added:
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestRelevantFields.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep  2 23:49:54 2009
@@ -28,6 +28,8 @@
 
 IMPROVEMENTS
 
+PIG-922: Logical optimizer: push up project part 1 (daijy)
+
 PIG-812: COUNT(*) does not work (breed)
 
 PIG-923: Allow specifying log file location through pig.properties (dvryaboy 
through daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Wed 
Sep  2 23:49:54 2009
@@ -34,16 +34,14 @@
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 
-public class LOCogroup extends LogicalOperator {
+public class LOCogroup extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     /**
@@ -722,5 +720,33 @@
             }
         }
     }
-
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors == null) {
+            return null;
+        }
+        
+        if (column>predecessors.size())
+            return null;
+        
+        if (column==0)
+        {
+            return getRequiredFields();
+        }
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>(); 
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            
+            if(inputNum == column-1) {
+                result.add(new RequiredFields(true));
+            } else {
+                result.add(null);
+            }
+        }
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java Wed Sep  
2 23:49:54 2009
@@ -18,20 +18,17 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.io.IOException;
 
 import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
@@ -40,7 +37,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOCross extends LogicalOperator {
+public class LOCross extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LOCross.class);
@@ -283,4 +280,49 @@
         return (requiredFields.size() == 0? null: requiredFields);
     }
 
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        
+        for (int i=0;i<predecessors.size();i++)
+            result.add(null);
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema == null) {
+                return null;
+            } else {
+                if (column<inputSchema.size()) {
+                    ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+                    inputList.add(new Pair<Integer, Integer>(inputNum, 
column));
+                    RequiredFields requiredFields = new 
RequiredFields(inputList);
+                    result.set(inputNum, requiredFields);
+                    return result;
+                }
+                column-=inputSchema.size();
+            }
+        }
+        // shall not get here
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Wed 
Sep  2 23:49:54 2009
@@ -34,7 +34,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LODistinct extends LogicalOperator {
+public class LODistinct extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LODistinct.class);
@@ -167,4 +167,25 @@
         return requiredFields;
     }
 
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        inputList.add(new Pair<Integer, Integer>(0, column));
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(inputList));
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Wed Sep 
 2 23:49:54 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.ArrayList;
@@ -37,7 +38,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOFilter extends LogicalOperator {
+public class LOFilter extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
     private LogicalPlan mComparisonPlan;
@@ -226,4 +227,25 @@
         }
     }
     
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        inputList.add(new Pair<Integer, Integer>(0, column));
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(inputList));
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Wed 
Sep  2 23:49:54 2009
@@ -18,7 +18,6 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,7 +42,7 @@
 import org.apache.commons.logging.LogFactory;
 
 
-public class LOForEach extends LogicalOperator {
+public class LOForEach extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
 
@@ -57,6 +56,10 @@
     private ArrayList<Boolean> mFlatten;
     private ArrayList<Schema> mUserDefinedSchema = null;
     private static Log log = LogFactory.getLog(LOForEach.class);
+    
+    // Cache the information of generating inner plan for each output schema 
while generating output schema, 
+    // for later use in caculate relevant field
+    private List<LogicalPlan> mSchemaPlanMapping = new 
ArrayList<LogicalPlan>();
 
     /**
      * @param plan
@@ -147,7 +150,7 @@
         if (!mIsSchemaComputed) {
             List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
                     mForEachPlans.size());
-
+            
             for (LogicalPlan plan : mForEachPlans) {
                 log.debug("Number of leaves in " + plan + " = " + 
plan.getLeaves().size());
                 for(int i = 0; i < plan.getLeaves().size(); ++i) {
@@ -262,14 +265,16 @@
                                                                                
newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
                                         
newFs.setParent(s.getField(i).canonicalName, op);
                                         fss.add(newFs);
+                                        mSchemaPlanMapping.add(plan);
                                         updateAliasCount(aliases, 
disambiguatorAlias);
-                                                                               
//it's fine if there are duplicates
+                                        //it's fine if there are duplicates
                                                                                
//we just need to record if its due to
                                                                                
//flattening
                                                                        } else {
                                                                                
newFs = new Schema.FieldSchema(fs);
                                         
newFs.setParent(s.getField(i).canonicalName, op);
                                                                                
fss.add(newFs);
+                                                                               
mSchemaPlanMapping.add(plan);
                                                                        }
                                     updateAliasCount(aliases, 
innerCanonicalAlias);
                                                                        
flattenAlias.put(newFs, innerCanonicalAlias);
@@ -294,12 +299,14 @@
                                         }
                                         updateAliasCount(aliases, newFs.alias);
                                         fss.add(newFs);
+                                        mSchemaPlanMapping.add(plan);
                                         newFs.setParent(null, op);
                                     } else {
                                         for(Schema.FieldSchema ufs: 
userDefinedSchema.getFields()) {
                                             
Schema.FieldSchema.setFieldSchemaDefaultType(ufs, DataType.BYTEARRAY);
                                             newFs = new 
Schema.FieldSchema(ufs);
                                             fss.add(newFs);
+                                            mSchemaPlanMapping.add(plan);
                                             newFs.setParent(null, op);
                                             updateAliasCount(aliases, 
ufs.alias);
                                         }
@@ -311,6 +318,7 @@
                                                                        newFs = 
new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                     }
                                     fss.add(newFs);
+                                    mSchemaPlanMapping.add(plan);
                                     newFs.setParent(null, op);
                                 }
                                                        }
@@ -331,6 +339,7 @@
                             }
                             newFs.setParent(planFs.canonicalName, op);
                             fss.add(newFs);
+                            mSchemaPlanMapping.add(plan);
                                                }
                                        } else {
                                                //did not get a valid list of 
field schemas
@@ -338,6 +347,7 @@
                         if(null != userDefinedSchema) {
                             Schema.FieldSchema userDefinedFieldSchema = new 
Schema.FieldSchema(userDefinedSchema.getField(0));
                             fss.add(userDefinedFieldSchema);
+                            mSchemaPlanMapping.add(plan);
                             userDefinedFieldSchema.setParent(null, op);
                             updateAliasCount(aliases, 
userDefinedFieldSchema.alias);
                         } else {
@@ -393,7 +403,8 @@
                        }
             mSchema = new Schema(fss);
                        //add the aliases that are unique after flattening
-                       for(Schema.FieldSchema fs: mSchema.getFields()) {
+            for(int i=0;i<mSchema.getFields().size();i++) {
+                Schema.FieldSchema fs = mSchema.getFields().get(i);
                                String alias = flattenAlias.get(fs);
                                Integer count = aliases.get(alias);
                                if (null == count) count = 1;
@@ -774,5 +785,66 @@
         }
         return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
     }
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+
+        if (column<0)
+            return null;
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
 
+        if (mSchema == null)
+            return null;
+        
+        if (mSchema.size()<=column)
+        {
+            return null;
+        }
+        
+        // find the index of foreach inner plan for this particular output 
column
+        LogicalPlan generatingPlan = null;
+        int planIndex = 0;
+
+        generatingPlan = mSchemaPlanMapping.get(0);
+        
+        for (int i=1;i<=column;i++)
+        {
+            if (mSchemaPlanMapping.get(i)!=generatingPlan)
+            {
+                planIndex++;
+                generatingPlan = mSchemaPlanMapping.get(i);
+            }
+        }
+
+        // find relavant input columns for foreach innner plan
+        LogicalPlan plan = mForEachPlans.get(planIndex);
+        TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
+                plan);
+        
+        try {
+            projectFinder.visit();
+        } catch (VisitorException ve) {
+            return null;
+        }
+        if(projectFinder.getProjectStarSet() != null) {
+            result.add(new RequiredFields(true));
+            return result;
+        }
+
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        for (LOProject project : projectFinder.getProjectSet()) {
+            for (int inputColumn : project.getProjection()) {
+                inputList.add(new Pair<Integer, Integer>(0, inputColumn));
+            }
+        }
+        if (inputList.size()==0)
+            return null;
+
+        result.add(new RequiredFields(inputList));
+        
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Wed Sep  
2 23:49:54 2009
@@ -42,7 +42,7 @@
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.ProjectionMap;
 
-public class LOJoin extends LogicalOperator {
+public class LOJoin extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     /**
@@ -498,4 +498,50 @@
             }
         }
     }
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        List<LogicalOperator> predecessors = 
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        
+        for (int i=0;i<predecessors.size();i++)
+            result.add(null);
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            LogicalOperator predecessor = predecessors.get(inputNum);
+            Schema inputSchema = null;        
+            
+            try {
+                inputSchema = predecessor.getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+            
+            if(inputSchema == null) {
+                return null;
+            } else {
+                if (column<inputSchema.size()) {
+                    ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+                    inputList.add(new Pair<Integer, Integer>(inputNum, 
column));
+                    RequiredFields requiredFields = new 
RequiredFields(inputList);
+                    result.set(inputNum, requiredFields);
+                    return result;
+                }
+                column-=inputSchema.size();
+            }
+        }
+        // shall not get here
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Wed Sep  
2 23:49:54 2009
@@ -28,8 +28,9 @@
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 
-public class LOLimit extends LogicalOperator {
+public class LOLimit extends RelationalOperator {
     private static final long serialVersionUID = 2L;
     private long mLimit;
     /**
@@ -160,5 +161,27 @@
         requiredFields.add(new RequiredFields(false, true));
         return requiredFields;
     }
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        inputList.add(new Pair<Integer, Integer>(0, column));
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(inputList));
+        return result;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Wed Sep  
2 23:49:54 2009
@@ -45,7 +45,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOLoad extends LogicalOperator {
+public class LOLoad extends RelationalOperator {
     private static final long serialVersionUID = 2L;
     protected boolean splittable = true;
 
@@ -293,4 +293,24 @@
         return requiredFields;
     }
 
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(true));
+        return result;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Wed Sep  
2 23:49:54 2009
@@ -40,7 +40,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOSort extends LogicalOperator {
+public class LOSort extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     private List<Boolean> mAscCols;
@@ -311,4 +311,26 @@
             }
         }
     }
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        inputList.add(new Pair<Integer, Integer>(0, column));
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(inputList));
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java Wed Sep  
2 23:49:54 2009
@@ -32,11 +32,12 @@
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOSplit extends LogicalOperator {
+public class LOSplit extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     private ArrayList<LogicalOperator> mOutputs;
@@ -182,5 +183,30 @@
            output.rewire(oldPred, oldPredIndex, newPred, useOldPred);
        }
    }
+   
+   @Override
+   public List<RequiredFields> getRelevantInputs(int output, int column) {
+       if (output<0)
+           return null;
+       
+       List<LogicalOperator> successors = mPlan.getSuccessors(this);
+       
+       if (output>=successors.size())
+           return null;
+       
+       if (column<0)
+           return null;
+       
+       // if we have schema information, check if output column is valid
+       if (mSchema!=null)
+       {
+           if (column >= mSchema.size())
+               return null;
+       }
+       
+       List<RequiredFields> result = new ArrayList<RequiredFields>();
+       result.add(new RequiredFields(false, true));
+       return result;
+   }
 
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java 
Wed Sep  2 23:49:54 2009
@@ -40,7 +40,7 @@
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 
 
-public class LOSplitOutput extends LogicalOperator {
+public class LOSplitOutput extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     protected int mIndex;
@@ -239,5 +239,25 @@
         //be treated as one operator. Any operations on split will imply an 
operation on
         //split output
     }
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
 
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>();
+        inputList.add(new Pair<Integer, Integer>(0, column));
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(inputList));
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Wed Sep  
2 23:49:54 2009
@@ -37,7 +37,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOStore extends LogicalOperator {
+public class LOStore extends RelationalOperator {
     private static final long serialVersionUID = 2L;
 
     private FileSpec mOutputFile;
@@ -175,5 +175,25 @@
         requiredFields.add(new RequiredFields(false, true));
         return requiredFields;
     }
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+        
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(true));
+        return result;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java Wed Sep 
 2 23:49:54 2009
@@ -46,7 +46,7 @@
  * details such as input/output/error specifications and also files to be
  * shipped to the cluster and files to be cached.
  */
-public class LOStream extends LogicalOperator {
+public class LOStream extends RelationalOperator {
 
     /**
      * 
@@ -236,5 +236,24 @@
         requiredFields.add(new RequiredFields(true, false));
         return requiredFields;
     }
-
+    
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        result.add(new RequiredFields(true));
+        return result;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Wed Sep  
2 23:49:54 2009
@@ -37,7 +37,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class LOUnion extends LogicalOperator {
+public class LOUnion extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LOUnion.class);
@@ -205,4 +205,35 @@
         return (requiredFields.size() == 0? null: requiredFields);
     }
 
+    @Override
+    public List<RequiredFields> getRelevantInputs(int output, int column) {
+        if (output!=0)
+            return null;
+
+        if (column<0)
+            return null;
+        
+        // if we have schema information, check if output column is valid
+        if (mSchema!=null)
+        {
+            if (column >= mSchema.size())
+                return null;
+        }
+                
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        if (predecessors == null)
+            return null;
+        
+        List<RequiredFields> result = new ArrayList<RequiredFields>();
+        for (int i=0;i<predecessors.size();i++)
+        {
+            ArrayList<Pair<Integer, Integer>> inputList = new 
ArrayList<Pair<Integer, Integer>>(); 
+            inputList.add(new Pair<Integer, Integer>(i, column));
+            result.add(new RequiredFields(inputList));
+        }
+        
+        
+        return result;
+    }
+
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java 
Wed Sep  2 23:49:54 2009
@@ -293,55 +293,4 @@
             loClone.mSchema = this.mSchema.clone();
         return loClone;
     }    
-
-
-    /**
-     * Produce a map describing how this operator modifies its projection.
-     * @return ProjectionMap null indicates it does not know how the projection
-     * changes, for example a join of two inputs where one input does not have
-     * a schema.
-     */
-    @Override
-    public ProjectionMap getProjectionMap() {
-        return null;
-    };
-    
-    /**
-     * Unset the projection map as if it had not been calculated.  This is 
used by
-     * anyone who reorganizes the tree and needs to have projection maps 
recalculated.
-     */
-    @Override
-    public void unsetProjectionMap() {
-        mIsProjectionMapComputed = false;
-        mProjectionMap = null;
-    }
-
-    /**
-     * Regenerate the projection map by unsetting and getting the projection 
map
-     */
-    @Override
-    public ProjectionMap regenerateProjectionMap() {
-        try {
-            regenerateSchema();
-        } catch (Exception e) {
-            
-        }
-        unsetProjectionMap();
-        return getProjectionMap();
-    }
-
-
-    /**
-        * Get a list of fields that this operator requires. This is not 
necessarily
-        * equivalent to the list of fields the operator projects. For example, 
a
-        * filter will project anything passed to it, but requires only the 
fields
-        * explicitly referenced in its filter expression.
-        * 
-        * @return list of RequiredFields null indicates that the operator does 
not need any
-        *         fields from its input.
-        */
-       public List<RequiredFields> getRequiredFields() {
-               return null;
-       }
-
 }

Added: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java?rev=810735&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java 
(added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java 
Wed Sep  2 23:49:54 2009
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.List;
+
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
+
+public abstract class RelationalOperator extends LogicalOperator {
+    private static final long serialVersionUID = 2L;
+    /**
+     * @param plan
+     *            Logical plan this operator is a part of.
+     * @param k
+     *            Operator key to assign to this node.
+     * @param rp
+     *            degree of requested parallelism with which to execute this
+     *            node.
+     */
+    public RelationalOperator(LogicalPlan plan, OperatorKey k, int rp) {
+        super(plan, k, rp);
+    }
+
+    /**
+     * @param plan
+     *            Logical plan this operator is a part of.
+     * @param k
+     *            Operator key to assign to this node.
+     */
+    public RelationalOperator(LogicalPlan plan, OperatorKey k) {
+        super(plan, k);
+    }
+    
+    /**
+     * Produce a map describing how this operator modifies its projection.
+     * @return ProjectionMap null indicates it does not know how the projection
+     * changes, for example a join of two inputs where one input does not have
+     * a schema.
+     */
+    @Override
+    public ProjectionMap getProjectionMap() {
+        return null;
+    };
+    
+    /**
+     * Unset the projection map as if it had not been calculated.  This is 
used by
+     * anyone who reorganizes the tree and needs to have projection maps 
recalculated.
+     */
+    @Override
+    public void unsetProjectionMap() {
+        mIsProjectionMapComputed = false;
+        mProjectionMap = null;
+    }
+
+    /**
+     * Regenerate the projection map by unsetting and getting the projection 
map
+     */
+    @Override
+    public ProjectionMap regenerateProjectionMap() {
+        try {
+            regenerateSchema();
+        } catch (Exception e) {
+            
+        }
+        unsetProjectionMap();
+        return getProjectionMap();
+    }
+
+
+    /**
+     * Get a list of fields that this operator requires. This is not 
necessarily
+     * equivalent to the list of fields the operator projects. For example, a
+     * filter will project anything passed to it, but requires only the fields
+     * explicitly referenced in its filter expression.
+     * 
+     * @return list of RequiredFields null indicates that the operator does 
not need any
+     *         fields from its input.
+     */
+    public List<RequiredFields> getRequiredFields() {
+        return null;
+    }
+    
+    /**
+     * Get relevant input columns of a particular output column. The resulting 
input columns 
+     * are necessary components only to the output column. Input columns 
needed by the entire 
+     * RelationalOperator thus indirectly contribute to the output columns are 
not counted. Those
+     * are required columns.
+     * eg1:
+     * A = load 'a' AS (a0, a1, a2);
+     * B = filter a by a0=='1';
+     * 
+     * Relevant input columns for B.$1 is A.a1 because A.a1 direct generate 
B.$1. A.a0 is needed
+     * by the filter operator and it is considered as required fields for the 
relational operator.
+     * 
+     * eg2:
+     * A = load 'a' AS (a0, a1);
+     * B = load 'b' AS (b0, b1);
+     * C = join A by a0, B by b0;
+     * 
+     * Relevant input columns for C.$0 is A.a0. Relevant input columns for 
C.$1 is A.a1.
+     * 
+     * eg3:
+     * A = load 'a' AS (a0, a1);
+     * B = load 'b' AS (b0, b1);
+     * C = cogroup A by a0, B by b0;
+     * 
+     * Relevant input columns for C.$0 is A.a0, B.b0. Relevant input columns 
for C.$1 is A.*. Relevant input columns for C.$2 is B.*.
+     * 
+     * eg4:
+     * A = load 'a' AS (a0, a1, a2);
+     * B = foreach A generate a1, a0+a2;
+     * 
+     * Relevant input columns for B.$0 is A.a1. Relevant input columns for 
B.$1 is A.a0 and A.a2.
+     * 
+     * eg5:
+     * A = load 'a' AS (a0, a1, a2);
+     * B = foreach A generate a1, *;
+     * 
+     * Relevant input columns for B.$0 is A.a1. Relevant input columns for 
B.$1 is A.a0. 
+     * Relevant input columns for B.$2 is A.a1. Relevant input columns for 
B.$3 is A.a2.
+     * 
+     * @param output output index. Only LOSplit have output other than 0 
currently 
+     * @param column output column
+     * @return List of relevant input columns. null if Pig cannot determine 
relevant inputs or any error occurs
+     */
+    abstract public List<RequiredFields> getRelevantInputs(int output, int 
column); 
+
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java?rev=810735&r1=810734&r2=810735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java Wed Sep  
2 23:49:54 2009
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.plan;
 
 import java.lang.StringBuilder;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.impl.util.Pair;
@@ -182,4 +183,5 @@
         sb.append(" fields: " + mFields);
         return sb.toString();
     }
+
 }
\ No newline at end of file


Reply via email to