Author: daijy
Date: Tue Jan  5 03:23:52 2010
New Revision: 895874

URL: http://svn.apache.org/viewvc?rev=895874&view=rev
Log:
PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the 
flattened fields is used in Join

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan  5 03:23:52 2010
@@ -323,6 +323,9 @@
 
 PIG-761: ERROR 2086 on simple JOIN (daijy)
 
+PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the
+flattened fields is used in Join (daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue 
Jan  5 03:23:52 2010
@@ -795,11 +795,8 @@
         return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
     }
     
-    public LogicalPlan getRelevantPlan(int output, int column)
+    public LogicalPlan getRelevantPlan(int column)
     {
-        if (output!=0)
-            return null;
-
         if (column<0)
             return null;
 
@@ -814,6 +811,22 @@
         return mSchemaPlanMapping.get(column);
     }
     
+    public boolean isInputFlattened(int column) throws FrontendException {
+        LogicalPlan plan = getRelevantPlan(column);
+        if (plan==null) {
+            int errCode = 2195;
+            throw new FrontendException("Fail to get foreach plan for input 
column "+column,
+                    errCode, PigException.BUG);
+        }
+        int index = mForEachPlans.indexOf(plan);
+        if (index==-1) {
+            int errCode = 2195;
+            throw new FrontendException("Fail to get foreach plan for input 
column "+column,
+                    errCode, PigException.BUG);
+        }
+        return mFlatten.get(index);
+    }
+    
     @Override
     public List<RequiredFields> getRelevantInputs(int output, int column) 
throws FrontendException {
         if (!mIsSchemaComputed)
@@ -835,7 +848,7 @@
             return null;
         }
         
-        LogicalPlan plan = getRelevantPlan(output, column);
+        LogicalPlan plan = getRelevantPlan(column);
         
         TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
                 plan);
@@ -946,7 +959,7 @@
             int index = planToRemove.get(planToRemove.size()-1);
             if (mUserDefinedSchema!=null) {
                 for (int i=mUserDefinedSchema.size()-1;i>=0;i--) {
-                    if (getRelevantPlan(0, i)==mForEachPlans.get(index))
+                    if (getRelevantPlan(i)==mForEachPlans.get(index))
                         mUserDefinedSchema.remove(i);
                 }
             }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Tue Jan  5 03:23:52 2010
@@ -292,7 +292,7 @@
                     else if (rlo instanceof LOForEach)
                     {
                         // Relay map keys from output to input
-                        LogicalPlan forEachPlan = 
((LOForEach)rlo).getRelevantPlan(requiredOutputField.first, 
requiredOutputField.second);
+                        LogicalPlan forEachPlan = 
((LOForEach)rlo).getRelevantPlan(requiredOutputField.second);
                         if (relevantFields.getFields()!=null && 
relevantFields.getFields().size()!=0)
                         {
                             int index = 
((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
 Tue Jan  5 03:23:52 2010
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.logicalLayer.optimizer;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,6 +42,7 @@
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
@@ -283,6 +285,23 @@
                     }
                 }
                 
+                // Check if flattened fields is required by LOJoin, if so, 
don't optimize
+                if (successor instanceof LOJoin) {
+                    List<RequiredFields> requiredFieldsList = 
((LOJoin)successor).getRequiredFields();
+                    RequiredFields requiredFields = 
requiredFieldsList.get(foreachPosition.intValue());
+                    
+                    MultiMap<Integer, Column> foreachMappedFields = 
foreachProjectionMap.getMappedFields();
+                    
+                    for (Pair<Integer, Integer> pair : 
requiredFields.getFields()) {
+                        Collection<Column> columns = 
foreachMappedFields.get(pair.second);
+                        for (Column column : columns) {
+                            Pair<Integer, Integer> foreachInputColumn = 
column.getInputColumn();
+                            if 
(foreach.isInputFlattened(foreachInputColumn.second))
+                                return false;
+                        }
+                    }
+                }
+                
                 mInsertBetween = true;
                 return true;
             }

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java 
Tue Jan  5 03:23:52 2010
@@ -977,5 +977,27 @@
 
     }
 
+    // See PIG-1172
+    @Test
+    public void testForeachJoinRequiredField() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
+        planTester.buildPlan("B = FOREACH A generate flatten($0);");
+        planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
+        planTester.buildPlan("D = JOIN B by a1, C by c1;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new 
PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+    }
+
 }
 


Reply via email to