Author: daijy Date: Tue Jan 5 03:52:50 2010 New Revision: 895881 URL: http://svn.apache.org/viewvc?rev=895881&view=rev Log: PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the flattened fields is used in Join
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=895881&r1=895880&r2=895881&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Jan 5 03:52:50 2010 @@ -255,6 +255,9 @@ PIG-761: ERROR 2086 on simple JOIN (daijy) +PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the +flattened fields is used in Join (daijy) + Release 0.5.0 INCOMPATIBLE CHANGES Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=895881&r1=895880&r2=895881&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue Jan 5 03:52:50 2010 @@ -795,11 +795,8 @@ return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns); } - public LogicalPlan getRelevantPlan(int output, int column) + public LogicalPlan getRelevantPlan(int column) { - if (output!=0) - return null; - if (column<0) return null; @@ -814,6 +811,22 @@ return mSchemaPlanMapping.get(column); } + public boolean isInputFlattened(int column) throws FrontendException { + LogicalPlan plan = getRelevantPlan(column); + if (plan==null) { + int errCode = 2195; + throw new FrontendException("Fail to get foreach plan for input column "+column, + errCode, PigException.BUG); + } + int index = mForEachPlans.indexOf(plan); + if (index==-1) { + int errCode = 2195; + throw new FrontendException("Fail to get foreach plan for input column "+column, + errCode, PigException.BUG); + } + return mFlatten.get(index); + } + @Override public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException { if (!mIsSchemaComputed) @@ -835,7 +848,7 @@ return null; } - LogicalPlan plan = getRelevantPlan(output, column); + LogicalPlan plan = getRelevantPlan(column); TopLevelProjectFinder projectFinder = new TopLevelProjectFinder( plan); @@ -946,7 +959,7 @@ int index = planToRemove.get(planToRemove.size()-1); if (mUserDefinedSchema!=null) { for (int i=mUserDefinedSchema.size()-1;i>=0;i--) { - if (getRelevantPlan(0, i)==mForEachPlans.get(index)) + if (getRelevantPlan(i)==mForEachPlans.get(index)) mUserDefinedSchema.remove(i); } } Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895881&r1=895880&r2=895881&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Tue Jan 5 03:52:50 2010 @@ -292,7 +292,7 @@ else if (rlo instanceof LOForEach) { // Relay map keys from output to input - LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.first, requiredOutputField.second); + LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.second); if (relevantFields.getFields()!=null && relevantFields.getFields().size()!=0) { int index = ((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan); Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=895881&r1=895880&r2=895881&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original) +++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Tue Jan 5 03:52:50 2010 @@ -19,6 +19,7 @@ package org.apache.pig.impl.logicalLayer.optimizer; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,6 +42,7 @@ import org.apache.pig.impl.plan.ProjectionMap; import org.apache.pig.impl.plan.RequiredFields; import org.apache.pig.impl.plan.OperatorPlan.IndexHelper; +import org.apache.pig.impl.plan.ProjectionMap.Column; import org.apache.pig.impl.plan.optimizer.OptimizerException; import org.apache.pig.PigException; import org.apache.pig.impl.util.MultiMap; @@ -283,6 +285,23 @@ } } + // Check if flattened fields is required by LOJoin, if so, don't optimize + if (successor instanceof LOJoin) { + List<RequiredFields> requiredFieldsList = ((LOJoin)successor).getRequiredFields(); + RequiredFields requiredFields = requiredFieldsList.get(foreachPosition.intValue()); + + MultiMap<Integer, Column> foreachMappedFields = foreachProjectionMap.getMappedFields(); + + for (Pair<Integer, Integer> pair : requiredFields.getFields()) { + Collection<Column> columns = foreachMappedFields.get(pair.second); + for (Column column : columns) { + Pair<Integer, Integer> foreachInputColumn = column.getInputColumn(); + if (foreach.isInputFlattened(foreachInputColumn.second)) + return false; + } + } + } + mInsertBetween = true; return true; } Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=895881&r1=895880&r2=895881&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original) +++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestPushDownForeachFlatten.java Tue Jan 5 03:52:50 2010 @@ -977,5 +977,27 @@ } + // See PIG-1172 + @Test + public void testForeachJoinRequiredField() throws Exception { + planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});"); + planTester.buildPlan("B = FOREACH A generate flatten($0);"); + planTester.buildPlan("C = load '3.txt' AS (c0, c1);"); + planTester.buildPlan("D = JOIN B by a1, C by c1;"); + LogicalPlan lp = planTester.buildPlan("E = limit D 10;"); + + planTester.setPlan(lp); + planTester.setProjectionMap(lp); + planTester.rebuildSchema(lp); + + PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp); + + LOLoad loada = (LOLoad) lp.getRoots().get(0); + + assertTrue(!pushDownForeach.check(lp.getSuccessors(loada))); + assertTrue(pushDownForeach.getSwap() == false); + assertTrue(pushDownForeach.getInsertBetween() == false); + } + }