Author: hashutosh
Date: Thu Feb 11 20:16:28 2010
New Revision: 909129

URL: http://svn.apache.org/viewvc?rev=909129&view=rev
Log:
PIG-834: incorrect plan when algebraic functions are nested

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=909129&r1=909128&r2=909129&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 11 20:16:28 2010
@@ -93,6 +93,8 @@
 
 BUG FIXES
 
+PIG-834: incorrect plan when algebraic functions are nested (ashutoshc)
+
 PIG-1217: Fix argToFuncMapping in Piggybank Top function (dvryaboy via gates)
 
 PIG-1154: Local Mode fails when hadoop config directory is specified in 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=909129&r1=909128&r2=909129&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Thu Feb 11 20:16:28 2010
@@ -76,7 +76,7 @@
  * works for things like generate group, SUM(A.$1 + 1).  But it fails for
  * things like the above.  Certain types of inner plans will never be
  * movable (like filters).  But distinct or order by in the inner plan
- * should be moble.  And, things like:
+ * should be mobile.  And, things like:
  *      C = cogroup A by $0, B by $0;
  *      D = foreach C {
  *          D1 = distinct A;
@@ -427,13 +427,51 @@
             }
             return ExprType.SIMPLE_PROJECT;
         } else if (leaf instanceof POUserFunc) {
-            return ((POUserFunc)leaf).combinable() ? ExprType.ALGEBRAIC :
-                ExprType.NOT_ALGEBRAIC;
+            
+            POUserFunc userFunc = (POUserFunc)leaf;
+            if(!userFunc.combinable() ){
+                return ExprType.NOT_ALGEBRAIC;
+            }
+            // The leaf userFunc may be combinable, but there might be other 
+            // algebraic userFuncs in the predecessors, if there are
+            // we choose not to fire combiner.
+            CheckCombinableUserFunc ccuf = new CheckCombinableUserFunc(pp);
+            ccuf.visit();
+            return ccuf.exprType;
         } else {
             return ExprType.NOT_ALGEBRAIC;
         }
     }
 
+      private static class CheckCombinableUserFunc extends PhyPlanVisitor{
+
+        private ExprType exprType = ExprType.ALGEBRAIC;
+          
+        public CheckCombinableUserFunc(PhysicalPlan plan) {
+            super(plan, new DependencyOrderWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+        }
+        
+        @Override
+        public void visit() throws VisitorException {
+            super.visit();
+        }
+         
+        @Override
+        public void visitUserFunc(POUserFunc userFunc) throws VisitorException 
{
+            
+            /* We already know there is one combinable POUserFunc and its a 
leaf. So,  
+             * successor of that userFunc is null. We are interested to find
+             * if there is another combinable userFunc somewhere in plan (that 
+             * is a userFunc with successors and is Combinable).
+             */
+            List<PhysicalOperator> succs = this.mPlan.getSuccessors(userFunc);
+            
+            if(succs != null && !succs.isEmpty() && userFunc.combinable()){
+                this.exprType = ExprType.NOT_ALGEBRAIC;                
+            }
+        }
+      }
+    
     // Returns number of fields that this will project, including the added
     // key field if that is necessary
     private void fixUpForeachs(

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=909129&r1=909128&r2=909129&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Thu Feb 11 
20:16:28 2010
@@ -39,13 +39,40 @@
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
 
 public class TestCombiner extends TestCase {
 
-    
-
     MiniCluster cluster = MiniCluster.buildCluster();
+
+    @Test
+    public void testSuccessiveUserFuncs1() throws Exception{
+        
+        LogicalPlanTester tester = new LogicalPlanTester();
+        tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+        tester.buildPlan("c = group a by c2; ");
+        tester.buildPlan("f = foreach c generate 
COUNT(org.apache.pig.builtin.Distinct($1.$2)); ");
+        LogicalPlan lp = tester.buildPlan("store f into 'out';");
+        PigContext pc = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties()).getPigContext();
+        
assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+    }
+
+    @Test
+    public void testSuccessiveUserFuncs2() throws Exception{
+        
+        LogicalPlanTester tester = new LogicalPlanTester();
+        tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+        tester.buildPlan("c = group a by c2; ");
+        String dummyUDF = JiraPig1030.class.getName();
+        tester.buildPlan("f = foreach c generate COUNT("+dummyUDF+"" +
+                       
"(org.apache.pig.builtin.Distinct($1.$2),"+dummyUDF+"())); ");
+        LogicalPlan lp = tester.buildPlan("store f into 'out';");
+        PigContext pc = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties()).getPigContext();
+        
assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+    }
     
     @Test
     public void testOnCluster() throws Exception {
@@ -92,7 +119,6 @@
                 + PigStorage.class.getName() + "(',');");
     }
     
-    
     @Test
     public void testNoCombinerUse() {
         // To simulate this, we will have two input files
@@ -348,7 +374,7 @@
             return "";
         }
     }
-    
+   
     @Test
     public void testJiraPig1030() {
         // test that combiner is NOT invoked when
@@ -391,4 +417,5 @@
             }
         }
     }
+
 }


Reply via email to