Author: rding
Date: Mon Sep 27 16:11:29 2010
New Revision: 1001797

URL: http://svn.apache.org/viewvc?rev=1001797&view=rev
Log:
PIG-1642: Order by doesn't use estimation to determine the parallelism

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1001797&r1=1001796&r2=1001797&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Mon Sep 27 16:11:29 2010
@@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1642: Order by doesn't use estimation to determine the parallelism (rding)
+
 PIG-1644: New logical plan: Plan.connect with position is misused in some
 places (daijy)
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1001797&r1=1001796&r2=1001797&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon Sep 27 16:11:29 2010
@@ -121,7 +121,7 @@ public class JobControlCompiler{
     Configuration conf;
     PigContext pigContext;
     
-    private final Log log = LogFactory.getLog(getClass());
+    private static final Log log = LogFactory.getLog(JobControlCompiler.class);
     
     public static final String LOG_DIR = "_logs";
 
@@ -682,10 +682,10 @@ public class JobControlCompiler{
      * @param lds
      * @throws IOException
      */
-    private void estimateNumberOfReducers(Configuration conf, List<POLoad> 
lds) throws IOException {
+    static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds) 
throws IOException {
            long bytesPerReducer = 
conf.getLong("pig.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 
1000));
         int maxReducers = conf.getInt("pig.exec.reducers.max", 999);
-        long totalInputFileSize = getTotalInputFileSize(lds);
+        long totalInputFileSize = getTotalInputFileSize(conf, lds);
        
         log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
             + maxReducers + " totalInputFileSize=" + totalInputFileSize);
@@ -695,10 +695,11 @@ public class JobControlCompiler{
         reducers = Math.min(maxReducers, reducers);
         conf.setInt("mapred.reduce.tasks", reducers);
 
-       log.info("Neither PARALLEL nor default parallelism is set for this job. 
Setting number of reducers to " + reducers);
+        log.info("Neither PARALLEL nor default parallelism is set for this 
job. Setting number of reducers to " + reducers);
+        return reducers;
     }
 
-    private long getTotalInputFileSize(List<POLoad> lds) throws IOException {
+    private static long getTotalInputFileSize(Configuration conf, List<POLoad> 
lds) throws IOException {
         List<String> inputs = new ArrayList<String>();
         if(lds!=null && lds.size()>0){
             for (POLoad ld : lds) {
@@ -722,7 +723,7 @@ public class JobControlCompiler{
         return size;
    }
    
-    private long getPathLength(FileSystem fs,FileStatus status) throws 
IOException{
+    private static long getPathLength(FileSystem fs,FileStatus status) throws 
IOException{
         if (!status.isDir()){
             return status.getLen();
         }else{

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1001797&r1=1001796&r2=1001797&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Mon Sep 27 16:11:29 2010
@@ -18,22 +18,28 @@
 
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.PigContext;
@@ -47,7 +53,7 @@ import org.apache.pig.impl.PigContext;
  */
 public class SampleOptimizer extends MROpPlanVisitor {
 
-    private Log log = LogFactory.getLog(getClass());
+    private static final Log log = LogFactory.getLog(SampleOptimizer.class);
     private PigContext pigContext;
 
     public SampleOptimizer(MROperPlan plan, PigContext pigContext) {
@@ -113,6 +119,39 @@ public class SampleOptimizer extends MRO
             return;
         }
 
+        // The MR job should have one successor.
+        List<MapReduceOper> succs = mPlan.getSuccessors(mr);
+        if (succs.size() != 1) {
+            log.debug("Job has more than one successor.");
+            return;
+        }
+        MapReduceOper succ = succs.get(0);
+        
+        // set/estimate the parallelism
+        if (succ.requestedParallelism == 1) {
+            List<PhysicalOperator> loads = pred.mapPlan.getRoots();
+            List<POLoad> lds = new ArrayList<POLoad>();
+            for (PhysicalOperator ld : loads) {
+                lds.add((POLoad)ld);
+            }
+            Configuration conf = 
ConfigurationUtil.toConfiguration(pigContext.getProperties());
+            int rp = 1;
+            try {
+                rp = JobControlCompiler.estimateNumberOfReducers(conf, lds);
+            } catch (IOException e) {
+                log.warn("Failed to estimate number of reducers", e);
+            }
+            
+            if (rp > 1) {
+                ParallelConstantVisitor visitor = new 
ParallelConstantVisitor(mr.reducePlan, rp);
+                visitor.visit();
+                if (visitor.isReplaced()) {
+                    succ.requestedParallelism = rp;
+                    log.info(" Setting number of reducers for order by to " + 
rp);
+                }
+            }
+        }
+        
         if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
             log.debug("Predecessor has more than just load+store in the map");
             return;
@@ -130,16 +169,8 @@ public class SampleOptimizer extends MRO
         }
         POLoad predLoad = (POLoad)r;
 
-        // The MR job should have one successor.
-        List<MapReduceOper> succs = mPlan.getSuccessors(mr);
-        if (succs.size() != 1) {
-            log.debug("Job has more than one successor.");
-            return;
-        }
-        MapReduceOper succ = succs.get(0);
-
         // Find the load the correlates with the file the sampler is loading, 
and
-        // check that it is using the twmp file storage format.
+        // check that it is using the temp file storage format.
         if (succ.mapPlan == null) { // Huh?
             log.debug("Successor has no map plan.");
             return;
@@ -240,4 +271,35 @@ public class SampleOptimizer extends MRO
                }
                }
     }
+    
+    private static class ParallelConstantVisitor extends PhyPlanVisitor {
+
+        private int rp;
+        
+        private boolean replaced = false;
+        
+        public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.rp = rp;
+        }
+        
+        public void visitConstant(ConstantExpression cnst) throws 
VisitorException {            
+            if (cnst.getRequestedParallelism() == -1) {
+                Object obj = cnst.getValue();
+                if (obj instanceof Integer) {
+                    if (replaced) {
+                        // sample job should have only one ConstantExpression
+                        throw new VisitorException("Invalid reduce plan: more 
" +
+                                       "than one ConstantExpression found in 
sampling job");
+                    }
+                    cnst.setValue(rp);                    
+                    cnst.setRequestedParallelism(rp);
+                    replaced = true;
+                }
+            }
+        }
+     
+        boolean isReplaced() { return replaced; }
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java?rev=1001797&r1=1001796&r2=1001797&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java 
Mon Sep 27 16:11:29 2010
@@ -607,6 +607,72 @@ public class TestJobSubmission extends j
         assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 1);
     }
     
+    @Test
+    public void testReducerNumEstimationForOrderBy() throws Exception{
+       // use the estimation
+        pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", 
"100");
+        pc.getProperties().setProperty("pig.exec.reducers.max", "10");
+        
+        LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+        Util.copyFromLocalToCluster(cluster, 
"test/org/apache/pig/test/data/passwd", "/passwd");
+        planTester.buildPlan("a = load '/passwd';");
+        LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+
+        MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+        assertEquals(2, mrPlan.size());     
+        
+        MapReduceOper sort = mrPlan.getLeaves().get(0);        
+        long reducer=Math.min((long)Math.ceil(new 
File("test/org/apache/pig/test/data/passwd").length()/100.0), 10);
+        assertEquals(reducer, sort.getRequestedParallelism());
+        
+        // use the PARALLEL key word, it will override the estimated reducer 
number
+        planTester = new LogicalPlanTester(pc) ;
+        planTester.buildPlan("a = load '/passwd';");
+        lp = planTester.buildPlan("b = order a by $0 PARALLEL 2;");
+        pp = Util.buildPhysicalPlan(lp, pc);
+        store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        
+        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);               
+        assertEquals(2, mrPlan.size());     
+        
+        sort = mrPlan.getLeaves().get(0);        
+        assertEquals(2, sort.getRequestedParallelism());
+        
+        // the estimation won't take effect when it apply to non-dfs or the 
files doesn't exist, such as hbase
+        planTester = new LogicalPlanTester(pc) ;
+        planTester.buildPlan("a = load 'hbase://passwd' using 
org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');");
+        lp = planTester.buildPlan("b = order a by $0 ;");
+        pp = Util.buildPhysicalPlan(lp, pc);
+        store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+ 
+        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);               
+        assertEquals(2, mrPlan.size());     
+        
+        sort = mrPlan.getLeaves().get(0);
+        
+        assertEquals(1, sort.getRequestedParallelism());
+        
+        // test order by with three jobs (after optimization)
+        planTester = new LogicalPlanTester(pc) ;
+        planTester.buildPlan("a = load '/passwd';");
+        planTester.buildPlan("b = foreach a generate $0, $1, $2;");
+        lp = planTester.buildPlan("c = order b by $0;");
+        pp = Util.buildPhysicalPlan(lp, pc);
+        store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        
+        mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
+        assertEquals(3, mrPlan.size());     
+        
+        sort = mrPlan.getLeaves().get(0);       
+        assertEquals(reducer, sort.getRequestedParallelism());
+    }
+    
     private void submit() throws Exception{
         assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
         MapReduceLauncher mrl = new MapReduceLauncher();

Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java?rev=1001797&r1=1001796&r2=1001797&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Mon Sep 
27 16:11:29 2010
@@ -52,13 +52,16 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -73,6 +76,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
@@ -606,6 +610,20 @@ public class Util {
         return comp.getMRPlan();       
     }
     
+    public static MROperPlan buildMRPlanWithOptimizer(PhysicalPlan pp, 
PigContext pc) throws Exception {
+        MapRedUtil.checkLeafIsStore(pp, pc);
+        
+        MapReduceLauncher launcher = new MapReduceLauncher();
+
+        java.lang.reflect.Method compile = launcher.getClass()
+                .getDeclaredMethod("compile",
+                        new Class[] { PhysicalPlan.class, PigContext.class });
+
+        compile.setAccessible(true);
+
+        return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
+    }
+    
     public static void registerMultiLineQuery(PigServer pigServer, String 
query) throws IOException {
         File f = File.createTempFile("tmp", "");
         PrintWriter pw = new PrintWriter(f);


Reply via email to