Author: thejas
Date: Mon Sep 20 19:31:54 2010
New Revision: 999082

URL: http://svn.apache.org/viewvc?rev=999082&view=rev
Log:
PIG-1617: 'group all' should always use one reducer

Added:
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Sep 20 19:31:54 2010
@@ -39,6 +39,7 @@ INCOMPATIBLE CHANGES
 PIG-1249: Safe-guards against misconfigured Pig scripts without PARALLEL 
keyword (zjffdu vi olgan)
 
 IMPROVEMENTS
+PIG-1617: 'group all' should always use one reducer (thejas)
 
 PIG-1589: add test cases for mapreduce operator which use distributed cache 
(thejas)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 Mon Sep 20 19:31:54 2010
@@ -26,21 +26,21 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
+import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.LimitOptimizer;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
-import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
 
-import org.apache.pig.newplan.logical.rules.LimitOptimizer;
-import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
-import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
-
 public class LogicalPlanOptimizer extends PlanOptimizer {
     private Set<String> mRulesOff = null;
     
@@ -162,6 +162,13 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
         
+        //set parallism to 1 for cogroup/group-by on constant
+        s = new HashSet<Rule>();
+        r = new GroupByConstParallelSetter("GroupByConstParallelSetter");
+        checkAndAddRule(s, r);
+        if(!s.isEmpty())
+            ls.add(s);
+        
         return ls;
     }
         

Modified: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java 
Mon Sep 20 19:31:54 2010
@@ -65,6 +65,14 @@ public class LOCogroup extends LogicalRe
     private Map<Integer,Long> generatedInputUids = new HashMap<Integer,Long>();
     
     final static String GROUP_COL_NAME = "group";
+    
+    /**
+     * Constructor for use in defining rule patterns
+     * @param plan
+     */
+    public LOCogroup(LogicalPlan plan) {
+        super("LOCogroup", plan);     
+    }
         
     public LOCogroup(OperatorPlan plan, 
MultiMap<Integer,LogicalExpressionPlan> 
     expressionPlans, boolean[] isInner ) {

Added: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java?rev=999082&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
 Mon Sep 20 19:31:54 2010
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/**
+ * Rule: If a LOCogroup is 'group all', set the parallelism to 1,
+ * or in general - if the group-by expression is just a constant
+ * then set parallelism to 1
+ * LogicalExpressionSimplifier could be used to convert an expression 
+ * with constants into a single ConstantExpression 
+ */
+
+public class GroupByConstParallelSetter extends Rule {
+
+    public GroupByConstParallelSetter(String n){
+        super(n, false);
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new GroupAllParallelSetterTransformer();
+    }
+    private final static Log log = 
LogFactory.getLog(GroupByConstParallelSetter.class);
+
+    public static class GroupAllParallelSetterTransformer extends Transformer {
+        private OperatorPlan plan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws FrontendException {
+            LOCogroup group = (LOCogroup)matched.getSources().get(0);
+            MultiMap<Integer, LogicalExpressionPlan> explPlans =
+                group.getExpressionPlans();
+            //check if the expression plan consist of just a ConstantExpression
+            for(LogicalExpressionPlan ep : explPlans.values()){
+                Iterator<Operator> op_iter = ep.getOperators();
+                if(op_iter.hasNext()){
+                    //return false if the ExpressionOperator is not
+                    // a ConstantExpression
+                    if(! (op_iter.next() instanceof ConstantExpression)){
+                        return false;
+                    }
+                }
+                // if there is more than one ExpressionOperator, return false
+                if(op_iter.hasNext()){
+                    return false;
+                }
+            }
+            return true;
+            
+        }
+
+        @Override
+        public void transform(OperatorPlan plan) throws FrontendException {
+            this.plan = ((OperatorSubPlan) plan).getBasePlan();
+            Iterator<Operator> iter = plan.getOperators();
+            while (iter.hasNext()) {
+                Operator op = iter.next();
+                if (op instanceof LOCogroup) {
+                    LOCogroup group = (LOCogroup)op;
+                    if(group.getRequestedParallelisam() > 1){
+                        log.warn("Resetting parallism to 1 for the 
group/cogroup " +
+                                group.getAlias() +
+                        " as the group by expressions returns a constant");
+                    }
+                    ((LOCogroup) op).setRequestedParallelism(1);
+                }
+            }
+        }
+        
+ 
+        @Override
+        public OperatorPlan reportChanges() {
+            return plan;
+        }
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op = new LOCogroup(plan);
+        plan.add(op);
+        return plan;
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java Mon Sep 20 
19:31:54 2010
@@ -50,6 +50,7 @@ public abstract class Rule {
     /**
      * Create this rule by using the default pattern that this rule provided
      * @param n Name of this rule
+     * @param mandatory if it is set to false, this rule can be disabled by 
user
      */
     public Rule(String n, boolean mandatory) {
         name = n;    

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Mon Sep 20 
19:31:54 2010
@@ -86,7 +86,9 @@ public class TestFRJoin2 {
                 .getProperties());
         
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
-        pigServer.registerQuery("B = group A all parallel 5;");
+        
+        // using $0*0, instead of group-all because group-all sets parallelism 
to 1 
+        pigServer.registerQuery("B = group A by $0*0 parallel 5;"); 
         pigServer.registerQuery("C = foreach B generate COUNT(A) as count, 
MAX(A.y) as max;");
         
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java?rev=999082&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java Mon 
Sep 20 19:31:54 2010
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGroupConstParallel {
+
+    private static final String INPUT_FILE = "TestGroupConstParallelInp";
+    private static MiniCluster cluster = MiniCluster.buildCluster();
+
+    
+    @BeforeClass
+    public static void oneTimeSetup() throws Exception{
+        String[] input = {
+                "two",
+                "one",
+                "two",
+        };
+        Util.createInputFile(cluster, INPUT_FILE, input);
+
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        cluster.shutDown();
+    }
+
+    /**
+     * Test parallelism for group all
+     * @throws Exception
+     */
+    @Test
+    public void testGroupAllWithParallel() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:chararray);");
+        pigServer.registerQuery("B = group A all parallel 5;");
+        {
+            Iterator<Tuple> iter = pigServer.openIterator("B");
+            List<Tuple> expectedRes = 
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {
+                                "('all',{('one'),('two'),('two')})"
+                        });
+            Util.checkQueryOutputsAfterSort(iter, expectedRes);
+            
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(1, jGraph.size());
+            // find added map-only concatenate job 
+            JobStats js = (JobStats)jGraph.getSources().get(0);
+            assertEquals(1, js.getNumberMaps());   
+            assertEquals(1, js.getNumberReduces()); 
+        }
+
+    }
+    
+    
+    /**
+     * Test parallelism for group by constant
+     * @throws Throwable
+     */
+    @Test
+    public void testGroupConstWithParallel() throws Throwable {
+        PigContext pc = new PigContext(ExecType.MAPREDUCE, 
cluster.getProperties());
+        pc.defaultParallel = 100;
+        pc.connect();
+        
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input';");
+        LogicalPlan lp = planTester.buildPlan("b = group a by 1;");
+        
+        PhysicalPlan pp = Util.getNewOptimizedPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        HExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        JobControl jobControl = jcc.compile(mrPlan, "Test");
+        Job job = jobControl.getWaitingJobs().get(0);
+        int parallel = job.getJobConf().getNumReduceTasks();
+
+        assertEquals("parallism", 1, parallel);
+    }
+    
+    /**
+     *  Test parallelism for group by column
+     * @throws Throwable
+     */
+    @Test
+    public void testGroupNonConstWithParallel() throws Throwable {
+        PigContext pc = new PigContext(ExecType.MAPREDUCE, 
cluster.getProperties());
+        pc.defaultParallel = 100;
+        pc.connect();
+        
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input';");
+        LogicalPlan lp = planTester.buildPlan("b = group a by $0;");
+        
+        PhysicalPlan pp = Util.getNewOptimizedPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        HExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        JobControl jobControl = jcc.compile(mrPlan, "Test");
+        Job job = jobControl.getWaitingJobs().get(0);
+        int parallel = job.getJobConf().getNumReduceTasks();
+        
+        assertEquals("parallism", 100, parallel);
+    }
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super(p, iterations, null);                 
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Rule r = new 
GroupByConstParallelSetter("GroupByConstParallelSetter");
+            Set<Rule> s = new HashSet<Rule>();
+            s.add(r);            
+            ls.add(s);
+            
+            return ls;
+        }
+    }    
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=999082&r1=999081&r2=999082&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon Sep 20 19:31:54 2010
@@ -67,11 +67,15 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.tools.grunt.GruntParser;
 
 import com.google.common.base.Function;
@@ -539,6 +543,62 @@ public class Util {
        visitor.visit();
        return visitor.getPhysicalPlan();
     }
+
+    /**
+     * migrate old logical plan to new logical plan
+     * @param lp
+     * @return new logical plan
+     * @throws FrontendException
+     */
+    public static org.apache.pig.newplan.logical.relational.LogicalPlan 
migrateToNewLP(LogicalPlan lp)
+    throws FrontendException{
+        LogicalPlanMigrationVistor visitor = new 
LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = 
visitor.getNewLogicalPlan();
+        
+        SchemaResetter schemaResetter = new SchemaResetter(newPlan);
+        schemaResetter.visit();
+        return newPlan;
+    }
+    
+    /**
+     * Run default set of optimizer rules on new logical plan
+     * @param lp
+     * @return optimized logical plan
+     * @throws FrontendException
+     */
+    public static  org.apache.pig.newplan.logical.relational.LogicalPlan 
optimizeNewLP( 
+            org.apache.pig.newplan.logical.relational.LogicalPlan lp)
+    throws FrontendException{
+        // run optimizer
+        org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer 
optimizer = 
+            new 
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(lp, 100, null);
+        optimizer.optimize();        
+        return lp;
+    }
+    
+    /**
+     * migrate old LP(logical plan) to new LP, optimize it, and build physical 
+     * plan
+     * @param lp
+     * @param pc PigContext
+     * @return physical plan
+     * @throws Exception
+     */
+    public static PhysicalPlan buildPhysicalPlanFromNewLP(
+            org.apache.pig.newplan.logical.relational.LogicalPlan lp, 
PigContext pc)
+    throws Exception {
+         org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor 
visitor =
+             new 
org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(lp);
+        visitor.setPigContext(pc);
+        visitor.visit();
+        return visitor.getPhysicalPlan();
+    }
+    
+    public static PhysicalPlan getNewOptimizedPhysicalPlan(LogicalPlan lp, 
PigContext pc)
+    throws FrontendException, Exception{
+        return buildPhysicalPlanFromNewLP(optimizeNewLP(migrateToNewLP(lp)), 
pc);
+    }
     
     public static MROperPlan buildMRPlan(PhysicalPlan pp, PigContext pc) 
throws Exception{
         MRCompiler comp = new MRCompiler(pp, pc);


Reply via email to