Author: daijy
Date: Tue Jan 5 03:52:50 2010
New Revision: 895881
URL: http://svn.apache.org/viewvc?rev=895881&view=rev
Log:
PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the
flattened fields is used in Join
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=895881&r1=895880&r2=895881&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Jan 5 03:52:50 2010
@@ -255,6 +255,9 @@
PIG-761: ERROR 2086 on simple JOIN (daijy)
+PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the
+flattened fields is used in Join (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=895881&r1=895880&r2=895881&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java
Tue Jan 5 03:52:50 2010
@@ -795,11 +795,8 @@
return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
}
- public LogicalPlan getRelevantPlan(int output, int column)
+ public LogicalPlan getRelevantPlan(int column)
{
- if (output!=0)
- return null;
-
if (column<0)
return null;
@@ -814,6 +811,22 @@
return mSchemaPlanMapping.get(column);
}
+ public boolean isInputFlattened(int column) throws FrontendException {
+ LogicalPlan plan = getRelevantPlan(column);
+ if (plan==null) {
+ int errCode = 2195;
+ throw new FrontendException("Fail to get foreach plan for input
column "+column,
+ errCode, PigException.BUG);
+ }
+ int index = mForEachPlans.indexOf(plan);
+ if (index==-1) {
+ int errCode = 2195;
+ throw new FrontendException("Fail to get foreach plan for input
column "+column,
+ errCode, PigException.BUG);
+ }
+ return mFlatten.get(index);
+ }
+
@Override
public List<RequiredFields> getRelevantInputs(int output, int column)
throws FrontendException {
if (!mIsSchemaComputed)
@@ -835,7 +848,7 @@
return null;
}
- LogicalPlan plan = getRelevantPlan(output, column);
+ LogicalPlan plan = getRelevantPlan(column);
TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
plan);
@@ -946,7 +959,7 @@
int index = planToRemove.get(planToRemove.size()-1);
if (mUserDefinedSchema!=null) {
for (int i=mUserDefinedSchema.size()-1;i>=0;i--) {
- if (getRelevantPlan(0, i)==mForEachPlans.get(index))
+ if (getRelevantPlan(i)==mForEachPlans.get(index))
mUserDefinedSchema.remove(i);
}
}
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895881&r1=895880&r2=895881&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
Tue Jan 5 03:52:50 2010
@@ -292,7 +292,7 @@
else if (rlo instanceof LOForEach)
{
// Relay map keys from output to input
- LogicalPlan forEachPlan =
((LOForEach)rlo).getRelevantPlan(requiredOutputField.first,
requiredOutputField.second);
+ LogicalPlan forEachPlan =
((LOForEach)rlo).getRelevantPlan(requiredOutputField.second);
if (relevantFields.getFields()!=null &&
relevantFields.getFields().size()!=0)
{
int index =
((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan);
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=895881&r1=895880&r2=895881&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
Tue Jan 5 03:52:50 2010
@@ -19,6 +19,7 @@
package org.apache.pig.impl.logicalLayer.optimizer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -41,6 +42,7 @@
import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.RequiredFields;
import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.PigException;
import org.apache.pig.impl.util.MultiMap;
@@ -283,6 +285,23 @@
}
}
+ // Check if flattened fields is required by LOJoin, if so,
don't optimize
+ if (successor instanceof LOJoin) {
+ List<RequiredFields> requiredFieldsList =
((LOJoin)successor).getRequiredFields();
+ RequiredFields requiredFields =
requiredFieldsList.get(foreachPosition.intValue());
+
+ MultiMap<Integer, Column> foreachMappedFields =
foreachProjectionMap.getMappedFields();
+
+ for (Pair<Integer, Integer> pair :
requiredFields.getFields()) {
+ Collection<Column> columns =
foreachMappedFields.get(pair.second);
+ for (Column column : columns) {
+ Pair<Integer, Integer> foreachInputColumn =
column.getInputColumn();
+ if
(foreach.isInputFlattened(foreachInputColumn.second))
+ return false;
+ }
+ }
+ }
+
mInsertBetween = true;
return true;
}
Modified:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=895881&r1=895880&r2=895881&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java
(original)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java
Tue Jan 5 03:52:50 2010
@@ -977,5 +977,27 @@
}
+ // See PIG-1172
+ @Test
+ public void testForeachJoinRequiredField() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
+ planTester.buildPlan("B = FOREACH A generate flatten($0);");
+ planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
+ planTester.buildPlan("D = JOIN B by a1, C by c1;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new
PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ }
+
}