Author: gates
Date: Tue Dec  1 18:31:25 2009
New Revision: 885858

URL: http://svn.apache.org/viewvc?rev=885858&view=rev
Log:
PIG-1022:  optimizer pushes filter before the foreach that generates column 
used by filter.

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=885858&r1=885857&r2=885858&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Dec  1 18:31:25 2009
@@ -41,6 +41,9 @@
 
 BUG FIXES
 
+PIG-1022:  optimizer pushes filter before the foreach that generates column
+used by filter (daijy via gates)
+
 PIG-1107: PigLineRecordReader bails out on an empty line for compressed data
 (ankit.modi via olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java?rev=885858&r1=885857&r2=885858&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
 Tue Dec  1 18:31:25 2009
@@ -36,6 +36,7 @@
 import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOProject;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOStream;
@@ -43,10 +44,12 @@
 import org.apache.pig.impl.logicalLayer.LOUnion;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
 import org.apache.pig.impl.logicalLayer.UDFFinder;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
@@ -256,6 +259,18 @@
 
                 Pair<Boolean, Set<Integer>> mappingResult = 
isRequiredFieldMapped(requiredField, predecessor.getProjectionMap());
                 boolean mapped = mappingResult.first;
+                
+                // Check if it is a direct mapping, that is, project 
optionally followed by cast, so if project->project, it is not
+                // considered as a mapping
+                for (Pair<Integer, Integer> pair : requiredField.getFields())
+                {
+                    if 
(!isFieldSimple(loForEach.getForEachPlans().get(pair.second)))
+                    {
+                        mapped = false;
+                        break;
+                    }
+                }
+                
                 if (!mapped) {
                     return false;
                 }
@@ -420,4 +435,44 @@
 
         return new Pair<Boolean, Set<Integer>>(true, grandParentIndexes);
     }
+    
+    /**
+     * Check if the inner plan is simple
+     * 
+     * @param lp
+     *        logical plan to check
+     * @return Whether if the logical plan is a simple project optionally 
followed by cast
+     */
+    boolean isFieldSimple(LogicalPlan lp) throws OptimizerException
+    {
+        TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(lp);
+            
+        try {
+            projectFinder.visit();
+        } catch (VisitorException ve) {
+            throw new OptimizerException();
+        }
+        if (projectFinder.getProjectSet()!=null && 
projectFinder.getProjectSet().size()==1)
+        {
+            LOProject project = 
projectFinder.getProjectSet().iterator().next();
+            if (lp.getPredecessors(project)==null)
+            {
+                LogicalOperator pred = project;
+                while (lp.getSuccessors(pred)!=null)
+                {
+                    if (lp.getSuccessors(pred).size()!=1)
+                        return false;
+                    if (!(lp.getSuccessors(pred).get(0) instanceof LOCast))
+                    {
+                        return false;
+                    }
+                    pred = lp.getSuccessors(pred).get(0);
+                }
+                return true;
+            }
+            return false;
+        }
+        else
+            return true;
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java?rev=885858&r1=885857&r2=885858&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java Tue Dec  1 
18:31:25 2009
@@ -1050,6 +1050,24 @@
         assertTrue(pushUpFilter.getPushBeforeInput() == -1);
         
     }
+    
+    @Test
+    public void testFilterNestedForEach() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+        planTester.buildPlan("B = group A by name;");
+        planTester.buildPlan("C = foreach B generate A.age as age;");        
+        LogicalPlan lp = planTester.buildPlan("D = filter C by age == 20;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        
+        PushUpFilter pushUpFilter = new PushUpFilter(lp);
+        
+        assertTrue(!pushUpFilter.check(lp.getLeaves()));
+        assertTrue(pushUpFilter.getSwap() == false);
+        assertTrue(pushUpFilter.getPushBefore() == false);
+        assertTrue(pushUpFilter.getPushBeforeInput() == -1);
+    }
 
 }
 


Reply via email to