Author: thejas
Date: Mon Sep 20 19:29:44 2010
New Revision: 999079

URL: http://svn.apache.org/viewvc?rev=999079&view=rev
Log:
PIG-1617: 'group all' should always use one reducer

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/Rule.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Mon Sep 20 19:29:44 2010
@@ -25,6 +25,7 @@ INCOMPATIBLE CHANGES
 PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL 
keyword (zjffdu vi olgan)
 
 IMPROVEMENTS
+PIG-1617: 'group all' should always use one reducer (thejas)
 
 PIG-1589: add test cases for mapreduce operator which use distributed cache 
(thejas)
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 Mon Sep 20 19:29:44 2010
@@ -26,21 +26,21 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
+import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.LimitOptimizer;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
-import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
 
-import org.apache.pig.newplan.logical.rules.LimitOptimizer;
-import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
-import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
-
 public class LogicalPlanOptimizer extends PlanOptimizer {
     private Set<String> mRulesOff = null;
     
@@ -162,6 +162,13 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
         
+        //set parallism to 1 for cogroup/group-by on constant
+        s = new HashSet<Rule>();
+        r = new GroupByConstParallelSetter("GroupByConstParallelSetter");
+        checkAndAddRule(s, r);
+        if(!s.isEmpty())
+            ls.add(s);
+        
         return ls;
     }
         

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
 Mon Sep 20 19:29:44 2010
@@ -65,6 +65,14 @@ public class LOCogroup extends LogicalRe
     private Map<Integer,Long> generatedInputUids = new HashMap<Integer,Long>();
     
     final static String GROUP_COL_NAME = "group";
+    
+    /**
+     * Constructor for use in defining rule patterns
+     * @param plan
+     */
+    public LOCogroup(LogicalPlan plan) {
+        super("LOCogroup", plan);     
+    }
         
     public LOCogroup(OperatorPlan plan, 
MultiMap<Integer,LogicalExpressionPlan> 
     expressionPlans, boolean[] isInner ) {

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/Rule.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/Rule.java?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/Rule.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/Rule.java 
Mon Sep 20 19:29:44 2010
@@ -50,6 +50,7 @@ public abstract class Rule {
     /**
      * Create this rule by using the default pattern that this rule provided
      * @param n Name of this rule
+     * @param mandatory if it is set to false, this rule can be disabled by 
user
      */
     public Rule(String n, boolean mandatory) {
         name = n;    

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
(original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin2.java 
Mon Sep 20 19:29:44 2010
@@ -86,7 +86,9 @@ public class TestFRJoin2 {
                 .getProperties());
         
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
-        pigServer.registerQuery("B = group A all parallel 5;");
+        
+        // using $0*0, instead of group-all because group-all sets parallelism 
to 1 
+        pigServer.registerQuery("B = group A by $0*0 parallel 5;"); 
         pigServer.registerQuery("C = foreach B generate COUNT(A) as count, 
MAX(A.y) as max;");
         
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();

Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java?rev=999079&r1=999078&r2=999079&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Mon Sep 
20 19:29:44 2010
@@ -67,11 +67,15 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.tools.grunt.GruntParser;
 
 import com.google.common.base.Function;
@@ -539,6 +543,62 @@ public class Util {
        visitor.visit();
        return visitor.getPhysicalPlan();
     }
+
+    /**
+     * migrate old logical plan to new logical plan
+     * @param lp
+     * @return new logical plan
+     * @throws FrontendException
+     */
+    public static org.apache.pig.newplan.logical.relational.LogicalPlan 
migrateToNewLP(LogicalPlan lp)
+    throws FrontendException{
+        LogicalPlanMigrationVistor visitor = new 
LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = 
visitor.getNewLogicalPlan();
+        
+        SchemaResetter schemaResetter = new SchemaResetter(newPlan);
+        schemaResetter.visit();
+        return newPlan;
+    }
+    
+    /**
+     * Run default set of optimizer rules on new logical plan
+     * @param lp
+     * @return optimized logical plan
+     * @throws FrontendException
+     */
+    public static  org.apache.pig.newplan.logical.relational.LogicalPlan 
optimizeNewLP( 
+            org.apache.pig.newplan.logical.relational.LogicalPlan lp)
+    throws FrontendException{
+        // run optimizer
+        org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer 
optimizer = 
+            new 
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(lp, 100, null);
+        optimizer.optimize();        
+        return lp;
+    }
+    
+    /**
+     * migrate old LP(logical plan) to new LP, optimize it, and build physical 
+     * plan
+     * @param lp
+     * @param pc PigContext
+     * @return physical plan
+     * @throws Exception
+     */
+    public static PhysicalPlan buildPhysicalPlanFromNewLP(
+            org.apache.pig.newplan.logical.relational.LogicalPlan lp, 
PigContext pc)
+    throws Exception {
+         org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor 
visitor =
+             new 
org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(lp);
+        visitor.setPigContext(pc);
+        visitor.visit();
+        return visitor.getPhysicalPlan();
+    }
+    
+    public static PhysicalPlan getNewOptimizedPhysicalPlan(LogicalPlan lp, 
PigContext pc)
+    throws FrontendException, Exception{
+        return buildPhysicalPlanFromNewLP(optimizeNewLP(migrateToNewLP(lp)), 
pc);
+    }
     
     public static MROperPlan buildMRPlan(PhysicalPlan pp, PigContext pc) 
throws Exception{
         MRCompiler comp = new MRCompiler(pp, pc);


Reply via email to