Author: daijy
Date: Fri Apr 16 17:14:49 2010
New Revision: 935000

URL: http://svn.apache.org/viewvc?rev=935000&view=rev
Log:
PIG-1374: PushDownForeachFlatten shall not push ForEach below Join if the 
flattened fields is used in the next statement

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=935000&r1=934999&r2=935000&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Apr 16 17:14:49 2010
@@ -225,6 +225,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1374: PushDownForeachFlatten shall not push ForEach below Join if the 
flattened fields is used in the next statement (daijy)
+
 PIG-1336: Optimize POStore serialized into JobConf (daijy)
 
 PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=935000&r1=934999&r2=935000&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
 Fri Apr 16 17:14:49 2010
@@ -35,6 +35,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOSort;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.UDFFinder;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -181,6 +182,23 @@ public class PushDownForeachFlatten exte
             IndexHelper<LogicalOperator> indexHelper = new 
IndexHelper<LogicalOperator>(peers);
             Integer foreachPosition = indexHelper.getIndex(foreach);
             
+            // Check if flattened fields is required by successor, if so, 
don't optimize
+            List<RequiredFields> requiredFieldsList = 
((RelationalOperator)successor).getRequiredFields();
+            RequiredFields requiredFields = 
requiredFieldsList.get(foreachPosition.intValue());
+            
+            MultiMap<Integer, Column> foreachMappedFields = 
foreachProjectionMap.getMappedFields();
+            
+            if (requiredFields.getFields()!=null) {
+                for (Pair<Integer, Integer> pair : requiredFields.getFields()) 
{
+                    Collection<Column> columns = 
foreachMappedFields.get(pair.second);
+                    for (Column column : columns) {
+                        Pair<Integer, Integer> foreachInputColumn = 
column.getInputColumn();
+                        if 
(foreach.isInputFlattened(foreachInputColumn.second))
+                            return false;
+                    }
+                }
+            }
+            
             // the foreach with flatten can be swapped with an order by
             // as the order by will have lesser number of records to sort
             // also the sort does not alter the records that are processed
@@ -285,23 +303,6 @@ public class PushDownForeachFlatten exte
                     }
                 }
                 
-                // Check if flattened fields is required by LOJoin, if so, 
don't optimize
-                if (successor instanceof LOJoin) {
-                    List<RequiredFields> requiredFieldsList = 
((LOJoin)successor).getRequiredFields();
-                    RequiredFields requiredFields = 
requiredFieldsList.get(foreachPosition.intValue());
-                    
-                    MultiMap<Integer, Column> foreachMappedFields = 
foreachProjectionMap.getMappedFields();
-                    
-                    for (Pair<Integer, Integer> pair : 
requiredFields.getFields()) {
-                        Collection<Column> columns = 
foreachMappedFields.get(pair.second);
-                        for (Column column : columns) {
-                            Pair<Integer, Integer> foreachInputColumn = 
column.getInputColumn();
-                            if 
(foreach.isInputFlattened(foreachInputColumn.second))
-                                return false;
-                        }
-                    }
-                }
-                
                 mInsertBetween = true;
                 return true;
             }

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=935000&r1=934999&r2=935000&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java 
Fri Apr 16 17:14:49 2010
@@ -1002,6 +1002,25 @@ public class TestPushDownForeachFlatten 
         assertTrue(pushDownForeach.getSwap() == false);
         assertTrue(pushDownForeach.getInsertBetween() == false);
     }
+    
+    // See PIG-1374
+    @Test
+    public void testForeachRequiredField() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as 
(b{t(a0:chararray,a1:int)});");
+        planTester.buildPlan("B = foreach A generate flatten($0);");
+        LogicalPlan lp = planTester.buildPlan("C = order B by $1 desc;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new 
PushDownForeachFlatten(lp);
 
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+    }
 }
 


Reply via email to