Author: sms
Date: Mon May 18 21:39:09 2009
New Revision: 776106
URL: http://svn.apache.org/viewvc?rev=776106&view=rev
Log:
PIG-697: Proposed improvements to pig's optimizer
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
hadoop/pig/trunk/test/org/apache/pig/test/TestProjectionMap.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May 18 21:39:09 2009
@@ -28,7 +28,7 @@
so that hadoop runs maps and creates output for the next
job (gates).
-PIG-693: Proposed improvements to pig's optimizer (sms)
+PIG-697: Proposed improvements to pig's optimizer (sms)
PIG-700: To automate the pig patch test process (gkesavan via sms)
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Mon
May 18 21:39:09 2009
@@ -29,6 +29,7 @@
import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -37,6 +38,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
public class LOCogroup extends LogicalOperator {
private static final long serialVersionUID = 2L;
@@ -511,5 +513,103 @@
return cogroupClone;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ return null;
+ }
+
+ //the column with the alias 'group' can be mapped in several ways
+ //1. group A by $0;
+ //Here the mapping is 0 -> (0, 0)
+ //2. group A by ($0, $1);
+ //Here there is no direct mapping and 'group' is an added column
+ //3. cogroup A by $0, B by $0;
+ //Here the mapping is 0 -> ((0, 0), (1, 0))
+ //4. cogroup A by ($0, $1), B by ($0, $1);
+ //Here there is no direct mapping and 'group' is an added column
+ //For anything other than a simple project 'group' is an added column
+
+ MultiMap<LogicalOperator, LogicalPlan> groupByPlans =
getGroupByPlans();
+
+ boolean groupByAdded = false;
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+ List<Pair<Integer, Integer>> removedFields = new
ArrayList<Pair<Integer, Integer>>();
+
+ for(int inputNum = 0; (inputNum < predecessors.size()) &&
(!groupByAdded); ++inputNum) {
+ LogicalOperator predecessor = predecessors.get(inputNum);
+
+
+ List<LogicalPlan> predecessorPlans = (ArrayList<LogicalPlan>)
groupByPlans.get(predecessor);
+
+ int inputColumn = -1;
+ for(LogicalPlan predecessorPlan: predecessorPlans) {
+ List<LogicalOperator> leaves = predecessorPlan.getLeaves();
+ if(leaves == null || leaves.size() > 1) {
+ groupByAdded = true;
+ break;
+ }
+
+ if(leaves.get(0) instanceof LOProject) {
+ //find out if this project is a chain of projects
+ if(LogicalPlan.chainOfProjects(predecessorPlan)) {
+ LOProject rootProject =
(LOProject)predecessorPlan.getRoots().get(0);
+ inputColumn = rootProject.getCol();
+ mapFields.put(0, new Pair<Integer, Integer>(inputNum,
inputColumn));
+ }
+ } else {
+ groupByAdded = true;
+ }
+ }
+
+ Schema inputSchema;
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(inputSchema != null) {
+ for(int column = 0; column < inputSchema.size(); ++column) {
+ if(!groupByAdded && inputColumn != column) {
+ removedFields.add(new Pair<Integer, Integer>(inputNum,
column));
+ }
+ }
+ }
+
+ }
+
+ List<Integer> addedFields = new ArrayList<Integer>();
+
+ if(groupByAdded) {
+ addedFields.add(0); //for the column 'group'
+ mapFields = null; //since 'group' is an added column there is no
mapping
+ }
+
+ //the columns 1 through n - 1 are generated by cogroup
+ for(int i = 0; i < groupByPlans.keySet().size(); ++i) {
+ addedFields.add(i+ 1);
+ }
+
+ if(removedFields.size() == 0) {
+ removedFields = null;
+ }
+
+ return new ProjectionMap(mapFields, removedFields, addedFields);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java Mon May
18 21:39:09 2009
@@ -32,7 +32,10 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -183,5 +186,73 @@
public byte getType() {
return DataType.BAG ;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ return null;
+ }
+
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+ List<Integer> addedFields = new ArrayList<Integer>();
+ boolean[] unknownSchema = new boolean[predecessors.size()];
+ boolean anyUnknownInputSchema = false;
+ int outputColumnNum = 0;
+
+ for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+ LogicalOperator predecessor = predecessors.get(inputNum);
+ Schema inputSchema = null;
+
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ unknownSchema[inputNum] = true;
+ outputColumnNum++;
+ addedFields.add(inputNum);
+ anyUnknownInputSchema = true;
+ } else {
+ unknownSchema[inputNum] = false;
+ for(int inputColumn = 0; inputColumn < inputSchema.size();
++inputColumn) {
+ mapFields.put(outputColumnNum++, new Pair<Integer,
Integer>(inputNum, inputColumn));
+ }
+ }
+ }
+
+ //TODO
+ /*
+ * For now, if there is any input that has an unknown schema
+ * flag it and return a null ProjectionMap.
+ * In the future, when unknown schemas are handled
+ * mark inputs that have unknown schemas as output columns
+ * that have been added.
+ */
+
+ if(anyUnknownInputSchema) {
+ return null;
+ }
+
+ if(addedFields.size() == 0) {
+ addedFields = null;
+ }
+
+ return new ProjectionMap(mapFields, null, addedFields);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Mon
May 18 21:39:09 2009
@@ -26,6 +26,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.data.DataType;
import org.apache.commons.logging.Log;
@@ -116,5 +117,44 @@
LODistinct distinctClone = (LODistinct)super.clone();
return distinctClone;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ Schema inputSchema = null;
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors != null) {
+ try {
+ inputSchema = predecessors.get(0).getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ return null;
+ }
+
+ if(Schema.equals(inputSchema, outputSchema, false, true)) {
+ //there is a one is to one mapping between input and output schemas
+ return new ProjectionMap(false);
+ } else {
+ //problem - input and output schemas for a distinct have to match!
+ return null;
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Mon May
18 21:39:09 2009
@@ -31,8 +31,10 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
/**
* This is the logical operator for the Fragment Replicate Join
@@ -268,6 +270,73 @@
return new Schema(fsList) ;
}
-
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ return null;
+ }
+
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+ List<Integer> addedFields = new ArrayList<Integer>();
+ boolean[] unknownSchema = new boolean[predecessors.size()];
+ boolean anyUnknownInputSchema = false;
+ int outputColumnNum = 0;
+
+ for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+ LogicalOperator predecessor = predecessors.get(inputNum);
+ Schema inputSchema = null;
+
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ unknownSchema[inputNum] = true;
+ outputColumnNum++;
+ addedFields.add(inputNum);
+ anyUnknownInputSchema = true;
+ } else {
+ unknownSchema[inputNum] = false;
+ for(int inputColumn = 0; inputColumn < inputSchema.size();
++inputColumn) {
+ mapFields.put(outputColumnNum++, new Pair<Integer,
Integer>(inputNum, inputColumn));
+ }
+ }
+ }
+
+ //TODO
+ /*
+ * For now, if there is any input that has an unknown schema
+ * flag it and return a null ProjectionMap.
+ * In the future, when unknown schemas are handled
+ * mark inputs that have unknown schemas as output columns
+ * that have been added.
+ */
+
+ if(anyUnknownInputSchema) {
+ return null;
+ }
+
+ if(addedFields.size() == 0) {
+ addedFields = null;
+ }
+
+ return new ProjectionMap(mapFields, null, addedFields);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Mon May
18 21:39:09 2009
@@ -23,6 +23,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
@@ -124,5 +125,44 @@
filterClone.mComparisonPlan = lpCloner.getClonedPlan();
return filterClone;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ Schema inputSchema = null;
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors != null) {
+ try {
+ inputSchema = predecessors.get(0).getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ return null;
+ }
+
+ if(Schema.equals(inputSchema, outputSchema, false, true)) {
+ //there is a one is to one mapping between input and output schemas
+ return new ProjectionMap(false);
+ } else {
+ //problem - input and output schemas for a filter have to match!
+ return null;
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon
May 18 21:39:09 2009
@@ -18,6 +18,7 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -30,7 +31,10 @@
import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.data.DataType;
@@ -174,7 +178,7 @@
//In the above script, the generate a1, will translate to
//project(a1) -> project(*) and will not be translated to
a sequence of projects
//As a result the project(*) will remain but the return
type is a bag
- //project*) with a data type set to tuple indicataes a
project(*) from an input
+ //project(*) with a data type set to tuple indicates a
project(*) from an input
//that has no schema
if( (((LOProject)op).isStar() ) &&
(((LOProject)op).getType() == DataType.TUPLE) ) {
mSchema = null;
@@ -447,4 +451,197 @@
return forEachClone;
}
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ return null;
+ }
+
+ LogicalOperator predecessor = predecessors.get(0);
+
+ Schema inputSchema;
+
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ List<LogicalPlan> foreachPlans = getForEachPlans();
+ List<Boolean> flattenList = getFlatten();
+
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+ List<Integer> addedFields = new ArrayList<Integer>();
+ int outputColumn = 0;
+
+ for(int i = 0; i < foreachPlans.size(); ++i) {
+ LogicalPlan foreachPlan = foreachPlans.get(i);
+ List<LogicalOperator> leaves = foreachPlan.getLeaves();
+ if(leaves == null || leaves.size() > 1) {
+ return null;
+ }
+
+ int inputColumn = -1;
+ boolean mapped = false;
+
+ if(leaves.get(0) instanceof LOProject) {
+ //find out if this project is a chain of projects
+ if(LogicalPlan.chainOfProjects(foreachPlan)) {
+ LOProject rootProject =
(LOProject)foreachPlan.getRoots().get(0);
+ inputColumn = rootProject.getCol();
+ if(inputSchema != null) {
+ mapped = true;
+ }
+ }
+ }
+
+ Schema.FieldSchema leafFS;
+ try {
+ leafFS = ((ExpressionOperator)leaves.get(0)).getFieldSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(leafFS == null) {
+ return null;
+ }
+
+ if(flattenList.get(i)) {
+ Schema innerSchema = leafFS.schema;
+
+ if(innerSchema != null) {
+ if(innerSchema.isTwoLevelAccessRequired()) {
+ // this is the case where the schema is that of
+ // a bag which has just one tuple fieldschema which
+ // in turn has a list of fieldschemas. The schema
+ // after flattening would consist of the fieldSchemas
+ // present in the tuple
+
+ // check that indeed we only have one field schema
+ // which is that of a tuple
+ if(innerSchema.getFields().size() != 1) {
+ return null;
+ }
+ Schema.FieldSchema tupleFS;
+ try {
+ tupleFS = innerSchema.getField(0);
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(tupleFS.type != DataType.TUPLE) {
+ return null;
+ }
+ innerSchema = tupleFS.schema;
+ }
+
+ //innerSchema could be modified and hence the second check
+ if(innerSchema != null) {
+ for(int j = 0; j < innerSchema.size(); ++j) {
+ if(mapped) {
+ //map each flattened column to the original
column
+ mapFields.put(outputColumn++, new
Pair<Integer, Integer>(0, inputColumn));
+ } else {
+ addedFields.add(outputColumn++);
+ }
+ }
+ } else {
+ //innerSchema is null; check for schema type
+ if(DataType.isSchemaType(leafFS.type)) {
+ //flattening a null schema results in a bytearray
+ if(mapped) {
+ //map each flattened column to the original
column
+ mapFields.put(outputColumn++, new
Pair<Integer, Integer>(0, inputColumn));
+ } else {
+ addedFields.add(outputColumn++);
+ }
+ } else {
+ mapFields.put(outputColumn++, new Pair<Integer,
Integer>(0, inputColumn));
+ }
+ }
+ } else {
+ //innerSchema is null; check for schema type
+ if(DataType.isSchemaType(leafFS.type)) {
+ //flattening a null schema results in a bytearray
+ if(mapped) {
+ //map each flattened column to the original column
+ mapFields.put(outputColumn++, new Pair<Integer,
Integer>(0, inputColumn));
+ } else {
+ addedFields.add(outputColumn++);
+ }
+ } else {
+ mapFields.put(outputColumn++, new Pair<Integer,
Integer>(0, inputColumn));
+ }
+ }
+ } else {
+ //not a flattened column
+ if(mapped) {
+ mapFields.put(outputColumn++, new Pair<Integer,
Integer>(0, inputColumn));
+ } else {
+ addedFields.add(outputColumn++);
+ }
+ }
+ }
+
+ List<Pair<Integer, Integer>> removedFields = new
ArrayList<Pair<Integer, Integer>>();
+
+ if(inputSchema == null) {
+ //if input schema is null then there are no mappedFields and
removedFields
+ mapFields = null;
+ removedFields = null;
+ } else {
+
+ //if the size of the map is zero then set it to null
+ if(mapFields.size() == 0) {
+ mapFields = null;
+ }
+
+ if(addedFields.size() == 0) {
+ addedFields = null;
+ }
+
+ //input schema is not null. Need to compute the removedFields
+ //compute the set difference between the input schema and mapped
fields
+
+ Set<Integer> removedSet = new HashSet<Integer>();
+ for(int i = 0; i < inputSchema.size(); ++i) {
+ removedSet.add(i);
+ }
+
+ if(mapFields != null) {
+ Set<Integer> mappedSet = new HashSet<Integer>();
+ for(Integer key: mapFields.keySet()) {
+ List<Pair<Integer, Integer>> values =
(ArrayList<Pair<Integer, Integer>>)mapFields.get(key);
+ for(Pair<Integer, Integer> value: values) {
+ mappedSet.add(value.second);
+ }
+ }
+ removedSet.removeAll(mappedSet);
+ }
+
+ if(removedSet.size() == 0) {
+ removedFields = null;
+ } else {
+ for(Integer i: removedSet) {
+ removedFields.add(new Pair<Integer, Integer>(0, i));
+ }
+ }
+ }
+
+ return new ProjectionMap(mapFields, removedFields, addedFields);
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Mon May
18 21:39:09 2009
@@ -19,6 +19,10 @@
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
@@ -28,7 +32,10 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -130,9 +137,6 @@
if (!mIsSchemaComputed) {
// get the schema of the load function
try {
- //DEBUG
- //System.out.println("Schema file: " + mSchema);
-
if (mEnforcedSchema != null) {
mSchema = mEnforcedSchema ;
return mSchema ;
@@ -220,5 +224,53 @@
public Schema getDeterminedSchema() {
return mDeterminedSchema;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ Schema inputSchema = null;
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors != null) {
+ try {
+ inputSchema = predecessors.get(0).getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ } else {
+ try {
+ inputSchema = mLoadFunc.determineSchema(mSchemaFile,
mExecType, mStorage);
+ } catch (IOException ioe) {
+ return null;
+ }
+ }
+
+ if(inputSchema == null) {
+ return null;
+ }
+
+ if(Schema.equals(inputSchema, outputSchema, false, true)) {
+ //there is a one is to one mapping between input and output schemas
+ return new ProjectionMap(false);
+ } else {
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+ //compute the mapping assuming its a prefix projection
+ for(int i = 0; i < inputSchema.size(); ++i) {
+ mapFields.put(i, new Pair<Integer, Integer>(0, i));
+ }
+ return new ProjectionMap(mapFields, null, null);
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Mon May
18 21:39:09 2009
@@ -27,6 +27,7 @@
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.data.DataType;
@@ -202,5 +203,44 @@
}
return clone;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ Schema inputSchema = null;
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors != null) {
+ try {
+ inputSchema = predecessors.get(0).getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ return null;
+ }
+
+ if(Schema.equals(inputSchema, outputSchema, false, true)) {
+ //there is a one is to one mapping between input and output schemas
+ return new ProjectionMap(false);
+ } else {
+ //problem - input and output schemas for a sort have to match!
+ return null;
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Mon May
18 21:39:09 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,9 +28,11 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -126,4 +129,41 @@
public FileSpec getInputSpec() {
return mInputSpec;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ if(outputSchema == null) {
+ return null;
+ }
+
+ Schema inputSchema = null;
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors != null) {
+ try {
+ inputSchema = predecessors.get(0).getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+ }
+
+ if(inputSchema == null) {
+ return null;
+ }
+
+ if(Schema.equals(inputSchema, outputSchema, false, true)) {
+ //there is a one is to one mapping between input and output schemas
+ return new ProjectionMap(false);
+ } else {
+ //problem - input and output schemas for a store have to match!
+ return null;
+ }
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Mon May
18 21:39:09 2009
@@ -18,15 +18,20 @@
package org.apache.pig.impl.logicalLayer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Set;
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.data.DataType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,7 +74,8 @@
if(null != mSchema) {
mSchema = mSchema.merge(op.getSchema(), false);
} else {
- mSchema = op.getSchema();
+ mSchema = null;
+ break;
}
}
if(null != mSchema) {
@@ -78,7 +84,7 @@
while(iter.hasNext()) {
op = iter.next();
Schema opSchema = op.getSchema();
- if(null != s) {
+ if(null != opSchema) {
for(Schema.FieldSchema opFs:
opSchema.getFields()) {
fs.setParent(opFs.canonicalName, op);
}
@@ -127,5 +133,49 @@
LOUnion unionClone = (LOUnion)super.clone();
return unionClone;
}
+
+ @Override
+ public ProjectionMap getProjectionMap() {
+ Schema outputSchema;
+
+ try {
+ outputSchema = getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(outputSchema == null) {
+ return null;
+ }
+
+ List<LogicalOperator> predecessors =
(ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+ if(predecessors == null) {
+ return null;
+ }
+
+ MultiMap<Integer, Pair<Integer, Integer>> mapFields = new
MultiMap<Integer, Pair<Integer, Integer>>();
+
+ for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+ LogicalOperator predecessor = predecessors.get(inputNum);
+ Schema inputSchema = null;
+
+ try {
+ inputSchema = predecessor.getSchema();
+ } catch (FrontendException fee) {
+ return null;
+ }
+
+ if(inputSchema == null) {
+ return null;
+ } else {
+ for(int inputColumn = 0; inputColumn < inputSchema.size();
++inputColumn) {
+ mapFields.put(inputColumn, new Pair<Integer,
Integer>(inputNum, inputColumn));
+ //removedFields.add(new Pair<Integer, Integer>(inputNum,
inputColumn));
+ }
+ }
+ }
+
+ return new ProjectionMap(mapFields, null, null);
+ }
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
Mon May 18 21:39:09 2009
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.io.IOException;
import org.apache.pig.data.DataType;
@@ -27,7 +28,9 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,12 +72,6 @@
*/
protected LogicalPlan mPlan;
- /**
- * A boolean variable to remember if input has to be flattened Used only in
- * the context of generate
- */
- //private boolean mIsFlatten = false;
-
private static Log log = LogFactory.getLog(LogicalOperator.class);
/**
@@ -128,11 +125,8 @@
// It's fine, it just means we don't have a schema yet.
}
if (mSchema == null) {
- log.debug("Operator schema is null; Setting it to new schema");
mSchema = schema;
} else {
- log.debug("Reconciling schema");
- log.debug("mSchema: " + mSchema + " schema: " + schema);
mSchema.reconcile(schema);
}
}
@@ -284,6 +278,29 @@
if(mSchema != null)
loClone.mSchema = this.mSchema.clone();
return loClone;
+ }
+
+
+ /**
+ * Produce a map describing how this operator modifies its projection.
+ * @return ProjectionMap null indicates it does not know how the projection
+ * changes, for example a join of two inputs where one input does not have
+ * a schema.
+ */
+ public ProjectionMap getProjectionMap() {
+ return null;
+ };
+
+ /**
+ * Get a list of fields that this operator requires. This is not
necessarily
+ * equivalent to the list of fields the operator projects. For example,
+ * a filter will project anything passed to it, but requires only the
fields
+ * explicitly referenced in its filter expression.
+ * @return list of fields, numbered from 0.
+ */
+ public List<Pair<Integer, Integer>> getRequiredFields()
+ {
+ return null;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=776106&r1=776105&r2=776106&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Mon
May 18 21:39:09 2009
@@ -150,4 +150,52 @@
return clone;
}
+ /**
+ * A utility method to check if a plan contains a chain of projection
+ * operators
+ *
+ * @param plan
+ * input plan
+ * @return true if there is a chain of projection operators; false
otherwise
+ */
+ public static boolean chainOfProjects(LogicalPlan plan) {
+
+ if (plan == null) {
+ return false;
+ }
+
+ List<LogicalOperator> leaves = plan.getLeaves();
+
+ if (leaves == null) {
+ return false;
+ }
+
+ if (leaves.size() > 1) {
+ return false;
+ }
+
+ LogicalOperator node = leaves.get(0);
+
+ while (true) {
+ if ((node == null) || !(node instanceof LOProject)) {
+ //not a projection operator
+ return false;
+ }
+
+ List<LogicalOperator> predecessors = plan.getPredecessors(node);
+
+ if (predecessors == null) {
+ //we have reached the root
+ return true;
+ }
+
+ if (predecessors.size() > 1) {
+ //a project cannot have multiple inputs
+ return false;
+ }
+
+ node = predecessors.get(0);
+ }
+ }
+
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java?rev=776106&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java Mon May 18
21:39:09 2009
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan;
+
+import java.io.Serializable;
+import java.lang.StringBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A struct detailing how a projection is altered by an operator.
+ */
+public class ProjectionMap {
+ /**
+ * Quick way for an operator to note that its input and output are the
same.
+ */
+ private boolean mChanges = true;
+
+ /**
+ * Map of field changes, with keys being the output fields of the operator
+ * and values being the input fields. Fields are numbered from 0. So for a
+ * foreach operator derived from 'B = foreach A generate $0, $2, $3,
+ * udf($1)' would produce a mapping of 0->(0, 0), 2->(0, 1), 3->(0, 2)
+ */
+ private MultiMap<Integer, Pair<Integer, Integer>> mMappedFields;
+
+ /**
+ * List of fields removed from the input. This includes fields that were
+ * transformed, and thus are no longer the same fields. Using the example
+ * foreach given under mappedFields, this list would contain '(0,1)'.
+ */
+ private List<Pair<Integer, Integer>> mRemovedFields;
+
+ /**
+ * List of fields in the output of this operator that were created by this
+ * operator. Using the example foreach given under mappedFields, this list
+ * would contain '3'.
+ */
+ private List<Integer> mAddedFields;
+
+ /**
+ *
+ * @param changes
+ * to indicate if this projection map changes its input or not
+ */
+ public ProjectionMap(boolean changes) {
+ this(null, null, null, changes);
+ }
+
+ /**
+ *
+ * @param mapFields
+ * the mapping of input column to output column
+ * @param removedFields
+ * the list of input columns that are removed
+ * @param addedFields
+ * the list of columns that are added to the output
+ */
+ public ProjectionMap(MultiMap<Integer, Pair<Integer, Integer>> mapFields,
+ List<Pair<Integer, Integer>> removedFields,
+ List<Integer> addedFields) {
+ this(mapFields, removedFields, addedFields, true);
+ }
+
+ /**
+ *
+ * @param mapFields
+ * the mapping of input column to output column
+ * @param removedFields
+ * the list of input columns that are removed
+ * @param addedFields
+ * the list of columns that are added to the output
+ * @param changes
+ * to indicate if this projection map changes its input or not
+ */
+ private ProjectionMap(MultiMap<Integer, Pair<Integer, Integer>> mapFields,
+ List<Pair<Integer, Integer>> removedFields,
+ List<Integer> addedFields, boolean changes) {
+ mMappedFields = mapFields;
+ mAddedFields = addedFields;
+ mRemovedFields = removedFields;
+ mChanges = changes;
+ }
+
+ /**
+ *
+ * @return the mapping of input column to output column
+ */
+ public MultiMap<Integer, Pair<Integer, Integer>> getMappedFileds() {
+ return mMappedFields;
+ }
+
+ /**
+ *
+ * @param fields
+ * the mapping of input column to output column
+ */
+ public void setMappedFileds(MultiMap<Integer, Pair<Integer, Integer>>
fields) {
+ mMappedFields = fields;
+ }
+
+ /**
+ *
+ * @return the list of input columns that are removed
+ */
+ public List<Pair<Integer, Integer>> getRemovedFileds() {
+ return mRemovedFields;
+ }
+
+ /**
+ *
+ * @param fields
+ * the list of input columns that are removed
+ */
+ public void setRemovedFileds(List<Pair<Integer, Integer>> fields) {
+ mRemovedFields = fields;
+ }
+
+ /**
+ *
+ * @return the list of columns that are added to the output
+ */
+ public List<Integer> getAddedFileds() {
+ return mAddedFields;
+ }
+
+ /**
+ *
+ * @param fields
+ * the list of columns that are added to the output
+ */
+ public void setAddedFileds(List<Integer> fields) {
+ mAddedFields = fields;
+ }
+
+ /**
+ *
+ * @return if this projection map changes its input or not
+ */
+ public boolean changes() {
+ return getChanges();
+ }
+
+
+ /**
+ *
+ * @return if this projection map changes its input or not
+ */
+ public boolean getChanges() {
+ return mChanges;
+ }
+
+ /**
+ *
+ * @param changes
+ * if this projection map changes its input or not
+ */
+ public void setChanges(boolean changes) {
+ mChanges = changes;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("changes: " + mChanges);
+ sb.append(" mapped fields: " + mMappedFields);
+ sb.append(" added fields: " + mAddedFields);
+ sb.append(" removed fields: " + mRemovedFields);
+ return sb.toString();
+ }
+}
\ No newline at end of file