Author: pradeepkth
Date: Mon Jan  4 22:29:14 2010
New Revision: 895804

URL: http://svn.apache.org/viewvc?rev=895804&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces - 
changes to implement interactions with LoadMetadata interface from pig runtime 
code (pradeepkth)

Modified:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java 
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java 
Mon Jan  4 22:29:14 2010
@@ -74,6 +74,7 @@
      * @param location Location as returned by 
      * {...@link LoadFunc#relativeToAbsolutePath(String, Path)}
      * @param job the {...@link Job} object
+     * store or retrieve earlier stored information from the {...@link 
UDFContext}
      * @throws IOException if the location is not valid.
      */
     public abstract void setLocation(String location, Job job) throws 
IOException;
@@ -263,7 +264,7 @@
      * back end before returning tuples in {...@link LoadFunc#getNext()}
      * @param signature a unique signature to identify this LoadFunc
      */
-    public void setSignature(String signature) {
+    public void setUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
        

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java 
Mon Jan  4 22:29:14 2010
@@ -61,7 +61,8 @@
      * @param location Location as returned by 
      * {...@link LoadFunc#relativeToAbsolutePath(String, 
org.apache.hadoop.fs.Path)}
      * @param conf The {...@link Configuration} object
-     * @return array of field names of the partition keys.
+     * @return array of field names of the partition keys. Implementations 
+     * should return null to indicate that there are no partition keys
      * @throws IOException if an exception occurs while retrieving partition 
keys
      */
     String[] getPartitionKeys(String location, Configuration conf) 
@@ -70,11 +71,14 @@
     /**
      * Set the filter for partitioning.  It is assumed that this filter
      * will only contain references to fields given as partition keys in
-     * getPartitionKeys
-     * @param plan that describes filter for partitioning
+     * getPartitionKeys. So if the implementation returns null in 
+     * {...@link #getPartitionKeys(String, Configuration)}, then this method 
is not
+     * called by pig runtime. This method is also not called by the pig runtime
+     * if there are no partition filter conditions. 
+     * @param partitionFilter that describes filter for partitioning
      * @throws IOException if the filter is not compatible with the storage
      * mechanism or contains non-partition fields.
      */
-    void setParitionFilter(OperatorPlan plan) throws IOException;
+    void setPartitionFilter(Expression partitionFilter) throws IOException;
 
 }
\ No newline at end of file

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java 
Mon Jan  4 22:29:14 2010
@@ -48,8 +48,11 @@
     /**
      * Indicate to the loader fields that will be needed.  This can be useful 
for
      * loaders that access data that is stored in a columnar format where 
indicating
-     * columns to be accessed a head of time will save scans.  If the loader
-     * function cannot make use of this information, it is free to ignore it.
+     * columns to be accessed a head of time will save scans.  This method will
+     * not be invoked by the Pig runtime if all fields are required. So 
implementations
+     * should assume that if this method is not invoked, then all fields from 
+     * the input are required. If the loader function cannot make use of this 
+     * information, it is free to ignore it by returning an appropriate 
Response
      * @param requiredFieldList RequiredFieldList indicating which columns 
will be needed.
      */
     public RequiredFieldResponse pushProjection(RequiredFieldList 
@@ -70,13 +73,27 @@
         // In the initial implementation only one level of subfields will be 
populated.
         private List<RequiredField> subFields;
         
-        // true for atomic types like INTEGER, FLOAT, DOUBLE, CHARARRAY, 
BYTEARRAY, LONG and when all 
-        // subfields from complex types like BAG, TUPLE and MAP are required
-        private boolean allSubFieldsRequired;
-        
         // Type of this field - the value could be any current PIG DataType 
(as specified by the constants in DataType class).
         private byte type;
 
+        public RequiredField() {
+            // to allow piece-meal construction
+        }
+        
+        /**
+         * @param alias
+         * @param index
+         * @param subFields
+         * @param type
+         */
+        public RequiredField(String alias, int index,
+                List<RequiredField> subFields, byte type) {
+            this.alias = alias;
+            this.index = index;
+            this.subFields = subFields;
+            this.type = type;
+        }
+
         /**
          * @return the alias
          */
@@ -112,14 +129,6 @@
             return type;
         }
 
-        /**
-         * @return true if all sub fields are required, false otherwise
-         */
-        public boolean isAllSubFieldsRequired() {
-            return allSubFieldsRequired;
-        }
-
-
         public void setType(byte t) {
             type = t;
         }
@@ -144,18 +153,18 @@
     }
 
     public static class RequiredFieldList implements Serializable {
-        // Implementation of the private fields is subject to change but the
-        // getter() interface should remain
         
         private static final long serialVersionUID = 1L;
         
         // list of Required fields, this will be null if all fields are 
required
         private List<RequiredField> fields = new ArrayList<RequiredField>(); 
         
-        // flag to indicate if all fields are required. The Loader 
implementation should check this flag first and look at the fields ONLY if this 
is true
-        private boolean allFieldsRequired;
-        
-        private String signature;
+        /**
+         * @param fields
+         */
+        public RequiredFieldList(List<RequiredField> fields) {
+            this.fields = fields;
+        }
 
         /**
          * @return the required fields - this will be null if all fields are
@@ -165,29 +174,13 @@
             return fields;
         }
 
-        public RequiredFieldList(String signature) {
-            this.signature = signature;
-        }
-        
-        public String getSignature() {
-            return signature;
+        public RequiredFieldList() {
         }
         
-        /**
-         * @return true if all fields are required, false otherwise
-         */
-        public boolean isAllFieldsRequired() {
-            return allFieldsRequired;
-        }
-
-        public void setAllFieldsRequired(boolean allRequired) {
-            allFieldsRequired = allRequired;
-        }
-
         @Override
         public String toString() {
             StringBuffer result = new StringBuffer();
-            if (allFieldsRequired)
+            if (fields == null)
                 result.append("*");
             else {
                 result.append("[");

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java 
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java Mon 
Jan  4 22:29:14 2010
@@ -39,6 +39,7 @@
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.impl.util.LogUtils;
@@ -129,6 +130,9 @@
 
         //by default we keep going on error on the backend
         properties.setProperty("stop.on.failure", ""+false);
+        
+        // set up client side system properties in UDF context
+        UDFContext.getUDFContext().setClientSystemProps();
 
         char opt;
         while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Mon Jan  4 22:29:14 2010
@@ -89,12 +89,14 @@
         LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
         // Pass loader signature to LoadFunc and to InputFormat through
         // the conf
-        passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf, true);
+        passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
+        
         // merge entries from split specific conf into the conf we got
         PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
         InputFormat inputFormat = loadFunc.getInputFormat();
         // now invoke the createRecordReader() with this "adjusted" conf
         RecordReader reader = inputFormat.createRecordReader(split, context);
+        
         return new PigRecordReader(reader, loadFunc, conf);
     }
     
@@ -108,7 +110,6 @@
      */
     static void mergeSplitSpecificConf(LoadFunc loadFunc, PigSplit pigSplit, 
Configuration originalConf) 
     throws IOException {
-     
         // set up conf with entries from input specific conf
         Job job = new Job(originalConf);
         loadFunc.setLocation(getLoadLocation(pigSplit.getInputIndex(), 
@@ -149,27 +150,22 @@
      * @param inputIndex the index of the input corresponding to the loadfunc
      * @param conf the Configuration object into which the signature should be
      * set
-     * @param initializeUDFContext flag to indicate if UDFContext also should
-     * be initialized
      * @throws IOException on failure
      */
     @SuppressWarnings("unchecked")
     static void passLoadSignature(LoadFunc loadFunc, int inputIndex, 
-            Configuration conf, boolean initializeUDFContext) throws 
IOException {
+            Configuration conf) throws IOException {
         List<String> inpSignatureLists = 
             (ArrayList<String>)ObjectSerializer.deserialize(
                     conf.get("pig.inpSignatures"));
         // signature can be null for intermediate jobs where it will not
         // be required to be passed down
         if(inpSignatureLists.get(inputIndex) != null) {
-            loadFunc.setSignature(inpSignatureLists.get(inputIndex));
+            loadFunc.setUDFContextSignature(inpSignatureLists.get(inputIndex));
             conf.set("pig.loader.signature", 
inpSignatureLists.get(inputIndex));
         }
         
-        if(initializeUDFContext) {
-            MapRedUtil.setupUDFContext(conf);
-        }
-           
+        MapRedUtil.setupUDFContext(conf);
     }
     
     /* (non-Javadoc)
@@ -237,8 +233,7 @@
                 Job inputSpecificJob = new Job(confClone);
                 // Pass loader signature to LoadFunc and to InputFormat through
                 // the conf
-                passLoadSignature(loadFunc, i, 
-                        inputSpecificJob.getConfiguration(), false);
+                passLoadSignature(loadFunc, i, 
inputSpecificJob.getConfiguration());
                 loadFunc.setLocation(inputs.get(i).getFileName(), 
                         inputSpecificJob);
                 // The above setLocation call could write to the conf within

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
 Mon Jan  4 22:29:14 2010
@@ -113,7 +113,7 @@
         // Pass loader signature to LoadFunc and to InputFormat through
         // the conf
         PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(), 
-                context.getConfiguration(), true);
+                context.getConfiguration());
         // now invoke initialize() on underlying RecordReader with
         // the "adjusted" conf
         wrappedReader.initialize(pigSplit.getWrappedSplit(), context);

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 Mon Jan  4 22:29:14 2010
@@ -100,8 +100,6 @@
         String filename = lFile.getFileName();
         LoadFunc origloader = 
             (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
-        if (loader instanceof PigStorage)
-            ((PigStorage)loader).setSignature(signature);
         loader = new ReadToEndLoader(origloader, 
                 ConfigurationUtil.toConfiguration(pc.getProperties()), 
                 filename,

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 Mon Jan  4 22:29:14 2010
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -382,7 +383,7 @@
     @Override
     public String[] getPartitionKeys(String location, Configuration conf)
             throws IOException {
-        throw new UnsupportedOperationException();
+        return null;
     }
 
     @Override
@@ -429,7 +430,7 @@
     }
 
     @Override
-    public void setParitionFilter(OperatorPlan plan) throws IOException {
+    public void setPartitionFilter(Expression plan) throws IOException {
         throw new UnsupportedOperationException();
     }
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 Mon Jan  4 22:29:14 2010
@@ -191,8 +191,7 @@
     public RequiredFieldResponse pushProjection(RequiredFieldList 
requiredFieldList) throws FrontendException {
         if (requiredFieldList == null)
             return null;
-        signature = requiredFieldList.getSignature();
-        if (!requiredFieldList.isAllFieldsRequired())
+        if (requiredFieldList.getFields() != null)
         {
             int lastColumn = -1;
             for (RequiredField rf: requiredFieldList.getFields())
@@ -289,7 +288,7 @@
 
     
     @Override
-    public void setSignature(String signature) {
+    public void setUDFContextSignature(String signature) {
         this.signature = signature; 
     }
 

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
 Mon Jan  4 22:29:14 2010
@@ -45,6 +45,7 @@
     protected RecordReader in = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
+    @Override
     public Tuple getNext() throws IOException {
         try {
             boolean notDone = in.nextKeyValue();

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
 Mon Jan  4 22:29:14 2010
@@ -44,6 +44,9 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 
 public class LOLoad extends RelationalOperator {
     private static final long serialVersionUID = 2L;
@@ -56,7 +59,7 @@
     private static Log log = LogFactory.getLog(LOLoad.class);
     private Schema mDeterminedSchema = null;
     private RequiredFieldList requiredFieldList;
-
+    
     /**
      * @param plan
      *            LogicalPlan this operator is a part of.
@@ -84,6 +87,7 @@
          try { 
              mLoadFunc = (LoadFunc)
                   
PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
+             mLoadFunc.setUDFContextSignature(getAlias());
         }catch (ClassCastException cce) {
             log.error(inputFileSpec.getFuncSpec() + " should implement the 
LoadFunc interface.");
             throw new IOException(cce);
@@ -324,7 +328,7 @@
         if (mSchema == null)
             return response;
         
-        if (requiredFieldList.isAllFieldsRequired())
+        if (requiredFieldList.getFields() == null)
             return response;
         
         if (requiredFieldList.getFields()==null)
@@ -375,6 +379,12 @@
         return response;
 
     }
+    
+    @Override
+    public void setAlias(String newAlias) {
+        super.setAlias(newAlias);
+        mLoadFunc.setUDFContextSignature(getAlias());
+    }
 
     @Override
     public boolean pruneColumns(List<Pair<Integer, Integer>> columns)

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Mon Jan  4 22:29:14 2010
@@ -27,15 +27,18 @@
 import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.*;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
+import org.apache.pig.impl.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.optimizer.RuleMatcher;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
 
 /**
  * An optimizer for logical plans.
@@ -68,13 +71,6 @@
         RulePlan rulePlan;
 
         // List of rules for the logical optimizer
-
-        // This one has to be first, as the type cast inserter expects the
-        // load to only have one output.
-        // Find any places in the plan that have an implicit split and make
-        // it explicit. Since the RuleMatcher doesn't handle trees properly,
-        // we cheat and say that we match any node. Then we'll do the actual
-        // test in the transformers check method.
         
         boolean turnAllRulesOff = false;
         if (mRulesOff != null) {
@@ -86,6 +82,13 @@
             }
         }
         
+        // This one has to be before the type cast inserter as it expects the
+        // load to only have one output.
+        // Find any places in the plan that have an implicit split and make
+        // it explicit. Since the RuleMatcher doesn't handle trees properly,
+        // we cheat and say that we match any node. Then we'll do the actual
+        // test in the transformers check method.
+        
         rulePlan = new RulePlan();
         RuleOperator anyLogicalOperator = new 
RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE, 
                 new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
@@ -93,10 +96,22 @@
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
                 new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
 
+        
+        // this one is ordered to be before other optimizations since  later 
+        // optimizations may move the LOFilter that is looks for just after a 
+        // LOLoad
+        rulePlan = new RulePlan();
+        RuleOperator loLoad = new RuleOperator(LOLoad.class, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(loLoad);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                new PartitionFilterOptimizer(plan), 
"PartitionFilterOptimizer"));
+
+
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
         rulePlan = new RulePlan();
-        RuleOperator loLoad = new RuleOperator(LOLoad.class, 
+        loLoad = new RuleOperator(LOLoad.class, 
                 new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
         rulePlan.add(loLoad);
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Mon Jan  4 22:29:14 2010
@@ -57,6 +57,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.MapKeysInfo;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -644,18 +645,19 @@
     // Prune fields of LOLoad, and use ColumePruner to prune all the 
downstream logical operators
     private void pruneLoader(LOLoad load, RequiredFields loaderRequiredFields) 
throws FrontendException
     {
-        RequiredFieldList requiredFieldList = new 
RequiredFieldList(load.getAlias());
-        requiredFieldList.setAllFieldsRequired(false);
+        RequiredFieldList requiredFieldList = new RequiredFieldList();
 
         if (loaderRequiredFields==null || loaderRequiredFields.needAllFields())
             return;
+        Schema loadSchema = load.getSchema();
         for (int i=0;i<loaderRequiredFields.size();i++)
         {
             Pair<Integer, Integer> pair = loaderRequiredFields.getField(i);
             MapKeysInfo mapKeysInfo = loaderRequiredFields.getMapKeysInfo(i);
             RequiredField requiredField = new RequiredField();
             requiredField.setIndex(pair.second);
-            requiredField.setType(load.getSchema().getField(pair.second).type);
+            requiredField.setAlias(loadSchema.getField(pair.second).alias);
+            requiredField.setType(loadSchema.getField(pair.second).type);
             if (mapKeysInfo!=null && !mapKeysInfo.needAllKeys())
             {
                 List<RequiredField> subFieldList = new 
ArrayList<RequiredField>();

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
 Mon Jan  4 22:29:14 2010
@@ -30,9 +30,10 @@
     
     private Configuration jconf = null;
     private HashMap<Integer, Properties> udfConfs;
-
+    private Properties clientSysProps;
+    private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
+    private static final String UDF_CONTEXT = "pig.udf.context"; 
     private static UDFContext self = null;
-
     private UDFContext() {
         udfConfs = new HashMap<Integer, Properties>();
     }
@@ -44,6 +45,20 @@
         return self;
     }
 
+    // internal pig use only - should NOT be called from user code
+    public void setClientSystemProps() {
+        clientSysProps = System.getProperties();        
+    }
+    
+    /**
+     * Get the System Properties (Read only) as on the client machine from 
where Pig
+     * was launched. This will include command line properties passed at launch
+     * time
+     * @return client side System Properties including command line properties
+     */
+    public Properties getClientSystemProps() {
+        return clientSysProps;
+    }
     /**
      * Adds the JobConf to this singleton.  Will be 
      * called on the backend by the Map and Reduce 
@@ -145,7 +160,8 @@
      * @throws IOException if underlying serialization throws it
      */
     public void serialize(Configuration conf) throws IOException {
-        conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+        conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
+        conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
     
     /**
@@ -156,7 +172,9 @@
      */
     @SuppressWarnings("unchecked")
     public void deserialize() throws IOException {  
-        udfConfs = (HashMap<Integer, 
Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+        udfConfs = (HashMap<Integer, 
Properties>)ObjectSerializer.deserialize(jconf.get(UDF_CONTEXT));
+        clientSysProps = (Properties)ObjectSerializer.deserialize(
+                jconf.get(CLIENT_SYS_PROPS));
     }
     
     @SuppressWarnings("unchecked")


Reply via email to