Author: daijy
Date: Tue Jan 26 00:30:40 2010
New Revision: 903027
URL: http://svn.apache.org/viewvc?rev=903027&view=rev
Log:
PIG-1184: PruneColumns optimization does not handle the case of foreach flatten
correctly if flattened bag is not used later
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 26 00:30:40 2010
@@ -144,6 +144,9 @@
PIG-1176: Column Pruner issues in union of loader with and without schema
(daijy)
+PIG-1184: PruneColumns optimization does not handle the case of foreach
+flatten correctly if flattened bag is not used later (daijy)
+
Release 0.6.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue
Jan 26 00:30:40 2010
@@ -803,28 +803,23 @@
if (mSchema == null)
return null;
- if (mSchema.size()<=column)
- {
- return null;
- }
-
return mSchemaPlanMapping.get(column);
}
public boolean isInputFlattened(int column) throws FrontendException {
- LogicalPlan plan = getRelevantPlan(column);
- if (plan==null) {
- int errCode = 2195;
- throw new FrontendException("Fail to get foreach plan for input
column "+column,
- errCode, PigException.BUG);
- }
- int index = mForEachPlans.indexOf(plan);
- if (index==-1) {
- int errCode = 2195;
- throw new FrontendException("Fail to get foreach plan for input
column "+column,
- errCode, PigException.BUG);
+ for (int i=0;i<mForEachPlans.size();i++) {
+ LogicalPlan forEachPlan = mForEachPlans.get(i);
+ TopLevelProjectFinder projectFinder = new
TopLevelProjectFinder(forEachPlan);
+ projectFinder.visit();
+ for (LOProject project : projectFinder.getProjectList()) {
+ if (project.getCol()==column) {
+ if (mFlatten.get(i))
+ return true;
+ }
+ }
}
- return mFlatten.get(index);
+
+ return false;
}
@Override
@@ -942,16 +937,16 @@
{
continue;
}
- boolean allPruned = true;
+ boolean anyPruned = false;
for (LOProject loProject : projectFinder.getProjectSet()) {
Pair<Integer, Integer> pair = new Pair<Integer, Integer>(0,
loProject.getCol());
- if (!columns.contains(pair)) {
- allPruned = false;
+ if (columns.contains(pair)) {
+ anyPruned = true;
break;
}
}
- if (allPruned) {
+ if (anyPruned) {
planToRemove.add(i);
}
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
Tue Jan 26 00:30:40 2010
@@ -53,6 +53,7 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.RelationalOperator;
import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.MapKeysInfo;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -361,20 +362,40 @@
}
}
}
-
-
+
// Merge with required input fields of this logical operator.
// RequiredInputFields come from two sources, one is mapping from
required output to input,
// the other is from the operator itself. Here we use
getRequiredFields to get the second part,
// and merge with the first part
List<RequiredFields> requiredFieldsListOfLOOp;
- // For LOForEach, requiredFields is not really required fields.
Here required fields means the input
- // fields required by the entire output columns, such as filter
condition in LOFilter, group columns in LOCoGroup.
- // For LOForEach, output columns are generated by the foreach plan
it belongs to, there is nothing globally required.
- // So we need to fix the semantic gap here. If the operator is
LOForEach, requiredFields is null.
+ // For LOForEach, requiredFields all flattened fields. Even the
flattened fields get pruned,
+ // it may expand the number of rows in the result. So flattened
fields shall not be pruned.
+ // LOForEach.getRequiredFields does not give the required fields.
RequiredFields means that field
+ // is required by all the outputs. The pipeline does not work
correctly without that field.
+ // LOForEach.getRequiredFields give all the input fields referred
in the LOForEach statement, but those
+ // fields can still be pruned (which means, not required)
+ // Eg:
+ // B = foreach A generate a0, a1, a2+a3;
+ // LOForEach.getRequiredFields gives (a0, a1, a2, a3);
+ // However, a2,a3 can be pruned if we do not need the a2+a3 for
LOForEach.
+ // So here, we do not use LOForEach.getRequiredFields, instead,
any flattened fields are required fields
+ if (rlo instanceof LOForEach) {
+ List<Pair<Integer, Integer>> flattenedInputs = new
ArrayList<Pair<Integer, Integer>>();
+ for (int i=0;i<rlo.getSchema().size();i++) {
+ if (((LOForEach)rlo).isInputFlattened(i)) {
+ flattenedInputs.add(new Pair<Integer, Integer>(0, i));
+ }
+ }
+ if (!flattenedInputs.isEmpty()) {
+ requiredFieldsListOfLOOp = new ArrayList<RequiredFields>();
+ requiredFieldsListOfLOOp.add(new
RequiredFields(flattenedInputs));
+ }
+ else
+ requiredFieldsListOfLOOp = null;
+ }
// For LOCross/LOUnion, actually we do not require any field here
- if (rlo instanceof LOForEach || rlo instanceof LOCross || rlo
instanceof LOUnion)
+ else if (rlo instanceof LOCross || rlo instanceof LOUnion)
requiredFieldsListOfLOOp = null;
else
requiredFieldsListOfLOOp = rlo.getRequiredFields();
@@ -785,5 +806,5 @@
String msg = "Unable to prune plan";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
- }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Jan 26
00:30:40 2010
@@ -1767,4 +1767,36 @@
assertTrue(emptyLogFileMessage());
}
+
+ // See PIG-1184
+ @Test
+ public void testForEachFlatten() throws Exception {
+ File inputFile = Util.createInputFile("table_testForEachFlatten", "",
new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
+
+ pigServer.registerQuery("A = load '"+inputFile.toString()+"' as
(a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)},
a3:bag{t:tuple(loc:chararray)});");
+ pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2),
flatten(a3), 10;");
+ pigServer.registerQuery("C = foreach B generate a0, $4;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(oiue,10)"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.toString().equals("(oiue,10)"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.toString().equals("(oiue,10)"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.toString().equals("(oiue,10)"));
+
+ assertFalse(iter.hasNext());
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+ "No map keys pruned for A"}));
+ }
}