Author: pradeepkth
Date: Mon Jan 4 22:29:14 2010
New Revision: 895804
URL: http://svn.apache.org/viewvc?rev=895804&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces -
changes to implement interactions with LoadMetadata interface from pig runtime
code (pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadFunc.java
Mon Jan 4 22:29:14 2010
@@ -74,6 +74,7 @@
* @param location Location as returned by
* {...@link LoadFunc#relativeToAbsolutePath(String, Path)}
* @param job the {...@link Job} object
+ * store or retrieve earlier stored information from the {...@link
UDFContext}
* @throws IOException if the location is not valid.
*/
public abstract void setLocation(String location, Job job) throws
IOException;
@@ -263,7 +264,7 @@
* back end before returning tuples in {...@link LoadFunc#getNext()}
* @param signature a unique signature to identify this LoadFunc
*/
- public void setSignature(String signature) {
+ public void setUDFContextSignature(String signature) {
// default implementation is a no-op
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
Mon Jan 4 22:29:14 2010
@@ -61,7 +61,8 @@
* @param location Location as returned by
* {...@link LoadFunc#relativeToAbsolutePath(String,
org.apache.hadoop.fs.Path)}
* @param conf The {...@link Configuration} object
- * @return array of field names of the partition keys.
+ * @return array of field names of the partition keys. Implementations
+ * should return null to indicate that there are no partition keys
* @throws IOException if an exception occurs while retrieving partition
keys
*/
String[] getPartitionKeys(String location, Configuration conf)
@@ -70,11 +71,14 @@
/**
* Set the filter for partitioning. It is assumed that this filter
* will only contain references to fields given as partition keys in
- * getPartitionKeys
- * @param plan that describes filter for partitioning
+ * getPartitionKeys. So if the implementation returns null in
+ * {...@link #getPartitionKeys(String, Configuration)}, then this method
is not
+ * called by pig runtime. This method is also not called by the pig runtime
+ * if there are no partition filter conditions.
+ * @param partitionFilter that describes filter for partitioning
* @throws IOException if the filter is not compatible with the storage
* mechanism or contains non-partition fields.
*/
- void setParitionFilter(OperatorPlan plan) throws IOException;
+ void setPartitionFilter(Expression partitionFilter) throws IOException;
}
\ No newline at end of file
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadPushDown.java
Mon Jan 4 22:29:14 2010
@@ -48,8 +48,11 @@
/**
* Indicate to the loader fields that will be needed. This can be useful
for
* loaders that access data that is stored in a columnar format where
indicating
- * columns to be accessed a head of time will save scans. If the loader
- * function cannot make use of this information, it is free to ignore it.
+ * columns to be accessed a head of time will save scans. This method will
+ * not be invoked by the Pig runtime if all fields are required. So
implementations
+ * should assume that if this method is not invoked, then all fields from
+ * the input are required. If the loader function cannot make use of this
+ * information, it is free to ignore it by returning an appropriate
Response
* @param requiredFieldList RequiredFieldList indicating which columns
will be needed.
*/
public RequiredFieldResponse pushProjection(RequiredFieldList
@@ -70,13 +73,27 @@
// In the initial implementation only one level of subfields will be
populated.
private List<RequiredField> subFields;
- // true for atomic types like INTEGER, FLOAT, DOUBLE, CHARARRAY,
BYTEARRAY, LONG and when all
- // subfields from complex types like BAG, TUPLE and MAP are required
- private boolean allSubFieldsRequired;
-
// Type of this field - the value could be any current PIG DataType
(as specified by the constants in DataType class).
private byte type;
+ public RequiredField() {
+ // to allow piece-meal construction
+ }
+
+ /**
+ * @param alias
+ * @param index
+ * @param subFields
+ * @param type
+ */
+ public RequiredField(String alias, int index,
+ List<RequiredField> subFields, byte type) {
+ this.alias = alias;
+ this.index = index;
+ this.subFields = subFields;
+ this.type = type;
+ }
+
/**
* @return the alias
*/
@@ -112,14 +129,6 @@
return type;
}
- /**
- * @return true if all sub fields are required, false otherwise
- */
- public boolean isAllSubFieldsRequired() {
- return allSubFieldsRequired;
- }
-
-
public void setType(byte t) {
type = t;
}
@@ -144,18 +153,18 @@
}
public static class RequiredFieldList implements Serializable {
- // Implementation of the private fields is subject to change but the
- // getter() interface should remain
private static final long serialVersionUID = 1L;
// list of Required fields, this will be null if all fields are
required
private List<RequiredField> fields = new ArrayList<RequiredField>();
- // flag to indicate if all fields are required. The Loader
implementation should check this flag first and look at the fields ONLY if this
is true
- private boolean allFieldsRequired;
-
- private String signature;
+ /**
+ * @param fields
+ */
+ public RequiredFieldList(List<RequiredField> fields) {
+ this.fields = fields;
+ }
/**
* @return the required fields - this will be null if all fields are
@@ -165,29 +174,13 @@
return fields;
}
- public RequiredFieldList(String signature) {
- this.signature = signature;
- }
-
- public String getSignature() {
- return signature;
+ public RequiredFieldList() {
}
- /**
- * @return true if all fields are required, false otherwise
- */
- public boolean isAllFieldsRequired() {
- return allFieldsRequired;
- }
-
- public void setAllFieldsRequired(boolean allRequired) {
- allFieldsRequired = allRequired;
- }
-
@Override
public String toString() {
StringBuffer result = new StringBuffer();
- if (allFieldsRequired)
+ if (fields == null)
result.append("*");
else {
result.append("[");
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Main.java Mon
Jan 4 22:29:14 2010
@@ -39,6 +39,7 @@
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.cmdline.CmdLineParser;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.impl.util.LogUtils;
@@ -129,6 +130,9 @@
//by default we keep going on error on the backend
properties.setProperty("stop.on.failure", ""+false);
+
+ // set up client side system properties in UDF context
+ UDFContext.getUDFContext().setClientSystemProps();
char opt;
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Mon Jan 4 22:29:14 2010
@@ -89,12 +89,14 @@
LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
- passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf, true);
+ passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
+
// merge entries from split specific conf into the conf we got
PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
InputFormat inputFormat = loadFunc.getInputFormat();
// now invoke the createRecordReader() with this "adjusted" conf
RecordReader reader = inputFormat.createRecordReader(split, context);
+
return new PigRecordReader(reader, loadFunc, conf);
}
@@ -108,7 +110,6 @@
*/
static void mergeSplitSpecificConf(LoadFunc loadFunc, PigSplit pigSplit,
Configuration originalConf)
throws IOException {
-
// set up conf with entries from input specific conf
Job job = new Job(originalConf);
loadFunc.setLocation(getLoadLocation(pigSplit.getInputIndex(),
@@ -149,27 +150,22 @@
* @param inputIndex the index of the input corresponding to the loadfunc
* @param conf the Configuration object into which the signature should be
* set
- * @param initializeUDFContext flag to indicate if UDFContext also should
- * be initialized
* @throws IOException on failure
*/
@SuppressWarnings("unchecked")
static void passLoadSignature(LoadFunc loadFunc, int inputIndex,
- Configuration conf, boolean initializeUDFContext) throws
IOException {
+ Configuration conf) throws IOException {
List<String> inpSignatureLists =
(ArrayList<String>)ObjectSerializer.deserialize(
conf.get("pig.inpSignatures"));
// signature can be null for intermediate jobs where it will not
// be required to be passed down
if(inpSignatureLists.get(inputIndex) != null) {
- loadFunc.setSignature(inpSignatureLists.get(inputIndex));
+ loadFunc.setUDFContextSignature(inpSignatureLists.get(inputIndex));
conf.set("pig.loader.signature",
inpSignatureLists.get(inputIndex));
}
- if(initializeUDFContext) {
- MapRedUtil.setupUDFContext(conf);
- }
-
+ MapRedUtil.setupUDFContext(conf);
}
/* (non-Javadoc)
@@ -237,8 +233,7 @@
Job inputSpecificJob = new Job(confClone);
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
- passLoadSignature(loadFunc, i,
- inputSpecificJob.getConfiguration(), false);
+ passLoadSignature(loadFunc, i,
inputSpecificJob.getConfiguration());
loadFunc.setLocation(inputs.get(i).getFileName(),
inputSpecificJob);
// The above setLocation call could write to the conf within
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
Mon Jan 4 22:29:14 2010
@@ -113,7 +113,7 @@
// Pass loader signature to LoadFunc and to InputFormat through
// the conf
PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(),
- context.getConfiguration(), true);
+ context.getConfiguration());
// now invoke initialize() on underlying RecordReader with
// the "adjusted" conf
wrappedReader.initialize(pigSplit.getWrappedSplit(), context);
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
Mon Jan 4 22:29:14 2010
@@ -100,8 +100,6 @@
String filename = lFile.getFileName();
LoadFunc origloader =
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
- if (loader instanceof PigStorage)
- ((PigStorage)loader).setSignature(signature);
loader = new ReadToEndLoader(origloader,
ConfigurationUtil.toConfiguration(pc.getProperties()),
filename,
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
Mon Jan 4 22:29:14 2010
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
@@ -382,7 +383,7 @@
@Override
public String[] getPartitionKeys(String location, Configuration conf)
throws IOException {
- throw new UnsupportedOperationException();
+ return null;
}
@Override
@@ -429,7 +430,7 @@
}
@Override
- public void setParitionFilter(OperatorPlan plan) throws IOException {
+ public void setPartitionFilter(Expression plan) throws IOException {
throw new UnsupportedOperationException();
}
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
Mon Jan 4 22:29:14 2010
@@ -191,8 +191,7 @@
public RequiredFieldResponse pushProjection(RequiredFieldList
requiredFieldList) throws FrontendException {
if (requiredFieldList == null)
return null;
- signature = requiredFieldList.getSignature();
- if (!requiredFieldList.isAllFieldsRequired())
+ if (requiredFieldList.getFields() != null)
{
int lastColumn = -1;
for (RequiredField rf: requiredFieldList.getFields())
@@ -289,7 +288,7 @@
@Override
- public void setSignature(String signature) {
+ public void setUDFContextSignature(String signature) {
this.signature = signature;
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/TextLoader.java
Mon Jan 4 22:29:14 2010
@@ -45,6 +45,7 @@
protected RecordReader in = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
+ @Override
public Tuple getNext() throws IOException {
try {
boolean notDone = in.nextKeyValue();
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
Mon Jan 4 22:29:14 2010
@@ -44,6 +44,9 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
public class LOLoad extends RelationalOperator {
private static final long serialVersionUID = 2L;
@@ -56,7 +59,7 @@
private static Log log = LogFactory.getLog(LOLoad.class);
private Schema mDeterminedSchema = null;
private RequiredFieldList requiredFieldList;
-
+
/**
* @param plan
* LogicalPlan this operator is a part of.
@@ -84,6 +87,7 @@
try {
mLoadFunc = (LoadFunc)
PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
+ mLoadFunc.setUDFContextSignature(getAlias());
}catch (ClassCastException cce) {
log.error(inputFileSpec.getFuncSpec() + " should implement the
LoadFunc interface.");
throw new IOException(cce);
@@ -324,7 +328,7 @@
if (mSchema == null)
return response;
- if (requiredFieldList.isAllFieldsRequired())
+ if (requiredFieldList.getFields() == null)
return response;
if (requiredFieldList.getFields()==null)
@@ -375,6 +379,12 @@
return response;
}
+
+ @Override
+ public void setAlias(String newAlias) {
+ super.setAlias(newAlias);
+ mLoadFunc.setUDFContextSignature(getAlias());
+ }
@Override
public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
Mon Jan 4 22:29:14 2010
@@ -27,15 +27,18 @@
import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.RelationalOperator;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.*;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
+import org.apache.pig.impl.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.optimizer.RuleMatcher;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
/**
* An optimizer for logical plans.
@@ -68,13 +71,6 @@
RulePlan rulePlan;
// List of rules for the logical optimizer
-
- // This one has to be first, as the type cast inserter expects the
- // load to only have one output.
- // Find any places in the plan that have an implicit split and make
- // it explicit. Since the RuleMatcher doesn't handle trees properly,
- // we cheat and say that we match any node. Then we'll do the actual
- // test in the transformers check method.
boolean turnAllRulesOff = false;
if (mRulesOff != null) {
@@ -86,6 +82,13 @@
}
}
+ // This one has to be before the type cast inserter as it expects the
+ // load to only have one output.
+ // Find any places in the plan that have an implicit split and make
+ // it explicit. Since the RuleMatcher doesn't handle trees properly,
+ // we cheat and say that we match any node. Then we'll do the actual
+ // test in the transformers check method.
+
rulePlan = new RulePlan();
RuleOperator anyLogicalOperator = new
RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE,
new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
@@ -93,10 +96,22 @@
mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
+
+ // this one is ordered to be before other optimizations since later
+ // optimizations may move the LOFilter that is looks for just after a
+ // LOLoad
+ rulePlan = new RulePlan();
+ RuleOperator loLoad = new RuleOperator(LOLoad.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loLoad);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new PartitionFilterOptimizer(plan),
"PartitionFilterOptimizer"));
+
+
// Add type casting to plans where the schema has been declared (by
// user, data, or data catalog).
rulePlan = new RulePlan();
- RuleOperator loLoad = new RuleOperator(LOLoad.class,
+ loLoad = new RuleOperator(LOLoad.class,
new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
rulePlan.add(loLoad);
mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
Mon Jan 4 22:29:14 2010
@@ -57,6 +57,7 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.RelationalOperator;
import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.MapKeysInfo;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -644,18 +645,19 @@
// Prune fields of LOLoad, and use ColumePruner to prune all the
downstream logical operators
private void pruneLoader(LOLoad load, RequiredFields loaderRequiredFields)
throws FrontendException
{
- RequiredFieldList requiredFieldList = new
RequiredFieldList(load.getAlias());
- requiredFieldList.setAllFieldsRequired(false);
+ RequiredFieldList requiredFieldList = new RequiredFieldList();
if (loaderRequiredFields==null || loaderRequiredFields.needAllFields())
return;
+ Schema loadSchema = load.getSchema();
for (int i=0;i<loaderRequiredFields.size();i++)
{
Pair<Integer, Integer> pair = loaderRequiredFields.getField(i);
MapKeysInfo mapKeysInfo = loaderRequiredFields.getMapKeysInfo(i);
RequiredField requiredField = new RequiredField();
requiredField.setIndex(pair.second);
- requiredField.setType(load.getSchema().getField(pair.second).type);
+ requiredField.setAlias(loadSchema.getField(pair.second).alias);
+ requiredField.setType(loadSchema.getField(pair.second).type);
if (mapKeysInfo!=null && !mapKeysInfo.needAllKeys())
{
List<RequiredField> subFieldList = new
ArrayList<RequiredField>();
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java?rev=895804&r1=895803&r2=895804&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
Mon Jan 4 22:29:14 2010
@@ -30,9 +30,10 @@
private Configuration jconf = null;
private HashMap<Integer, Properties> udfConfs;
-
+ private Properties clientSysProps;
+ private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
+ private static final String UDF_CONTEXT = "pig.udf.context";
private static UDFContext self = null;
-
private UDFContext() {
udfConfs = new HashMap<Integer, Properties>();
}
@@ -44,6 +45,20 @@
return self;
}
+ // internal pig use only - should NOT be called from user code
+ public void setClientSystemProps() {
+ clientSysProps = System.getProperties();
+ }
+
+ /**
+ * Get the System Properties (Read only) as on the client machine from
where Pig
+ * was launched. This will include command line properties passed at launch
+ * time
+ * @return client side System Properties including command line properties
+ */
+ public Properties getClientSystemProps() {
+ return clientSysProps;
+ }
/**
* Adds the JobConf to this singleton. Will be
* called on the backend by the Map and Reduce
@@ -145,7 +160,8 @@
* @throws IOException if underlying serialization throws it
*/
public void serialize(Configuration conf) throws IOException {
- conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+ conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
+ conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
}
/**
@@ -156,7 +172,9 @@
*/
@SuppressWarnings("unchecked")
public void deserialize() throws IOException {
- udfConfs = (HashMap<Integer,
Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+ udfConfs = (HashMap<Integer,
Properties>)ObjectSerializer.deserialize(jconf.get(UDF_CONTEXT));
+ clientSysProps = (Properties)ObjectSerializer.deserialize(
+ jconf.get(CLIENT_SYS_PROPS));
}
@SuppressWarnings("unchecked")