Author: rding Date: Mon Sep 27 16:11:29 2010 New Revision: 1001797 URL: http://svn.apache.org/viewvc?rev=1001797&view=rev Log: PIG-1642: Order by doesn't use estimation to determine the parallelism
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1001797&r1=1001796&r2=1001797&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.8/CHANGES.txt Mon Sep 27 16:11:29 2010 @@ -198,6 +198,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1642: Order by doesn't use estimation to determine the parallelism (rding) + PIG-1644: New logical plan: Plan.connect with position is misused in some places (daijy) Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1001797&r1=1001796&r2=1001797&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Sep 27 16:11:29 2010 @@ -121,7 +121,7 @@ public class JobControlCompiler{ Configuration conf; PigContext pigContext; - private final Log log = LogFactory.getLog(getClass()); + private static final Log log = LogFactory.getLog(JobControlCompiler.class); public static final String LOG_DIR = "_logs"; @@ -682,10 +682,10 @@ public class JobControlCompiler{ * @param lds * @throws IOException */ - private void estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException { + static int estimateNumberOfReducers(Configuration conf, List<POLoad> lds) throws IOException { long bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)); int maxReducers = conf.getInt("pig.exec.reducers.max", 999); - long totalInputFileSize = getTotalInputFileSize(lds); + long totalInputFileSize = getTotalInputFileSize(conf, lds); log.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " totalInputFileSize=" + totalInputFileSize); @@ -695,10 +695,11 @@ public class JobControlCompiler{ reducers = Math.min(maxReducers, reducers); conf.setInt("mapred.reduce.tasks", reducers); - log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers); + log.info("Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to " + reducers); + return reducers; } - private long getTotalInputFileSize(List<POLoad> lds) throws IOException { + private static long getTotalInputFileSize(Configuration conf, List<POLoad> lds) throws IOException { List<String> inputs = new ArrayList<String>(); if(lds!=null && lds.size()>0){ for (POLoad ld : lds) { @@ -722,7 +723,7 @@ public class JobControlCompiler{ return size; } - private long getPathLength(FileSystem fs,FileStatus status) throws IOException{ + private static long getPathLength(FileSystem fs,FileStatus status) throws IOException{ if (!status.isDir()){ return status.getLen(); }else{ Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1001797&r1=1001796&r2=1001797&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Mon Sep 27 16:11:29 2010 @@ -18,22 +18,28 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.pig.FuncSpec; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.PlanWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.Utils; import org.apache.pig.impl.PigContext; @@ -47,7 +53,7 @@ import org.apache.pig.impl.PigContext; */ public class SampleOptimizer extends MROpPlanVisitor { - private Log log = LogFactory.getLog(getClass()); + private static final Log log = LogFactory.getLog(SampleOptimizer.class); private PigContext pigContext; public SampleOptimizer(MROperPlan plan, PigContext pigContext) { @@ -113,6 +119,39 @@ public class SampleOptimizer extends MRO return; } + // The MR job should have one successor. + List<MapReduceOper> succs = mPlan.getSuccessors(mr); + if (succs.size() != 1) { + log.debug("Job has more than one successor."); + return; + } + MapReduceOper succ = succs.get(0); + + // set/estimate the parallelism + if (succ.requestedParallelism == 1) { + List<PhysicalOperator> loads = pred.mapPlan.getRoots(); + List<POLoad> lds = new ArrayList<POLoad>(); + for (PhysicalOperator ld : loads) { + lds.add((POLoad)ld); + } + Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + int rp = 1; + try { + rp = JobControlCompiler.estimateNumberOfReducers(conf, lds); + } catch (IOException e) { + log.warn("Failed to estimate number of reducers", e); + } + + if (rp > 1) { + ParallelConstantVisitor visitor = new ParallelConstantVisitor(mr.reducePlan, rp); + visitor.visit(); + if (visitor.isReplaced()) { + succ.requestedParallelism = rp; + log.info(" Setting number of reducers for order by to " + rp); + } + } + } + if (pred.mapPlan == null || pred.mapPlan.size() != 2) { log.debug("Predecessor has more than just load+store in the map"); return; @@ -130,16 +169,8 @@ public class SampleOptimizer extends MRO } POLoad predLoad = (POLoad)r; - // The MR job should have one successor. - List<MapReduceOper> succs = mPlan.getSuccessors(mr); - if (succs.size() != 1) { - log.debug("Job has more than one successor."); - return; - } - MapReduceOper succ = succs.get(0); - // Find the load the correlates with the file the sampler is loading, and - // check that it is using the twmp file storage format. + // check that it is using the temp file storage format. if (succ.mapPlan == null) { // Huh? log.debug("Successor has no map plan."); return; @@ -240,4 +271,35 @@ public class SampleOptimizer extends MRO } } } + + private static class ParallelConstantVisitor extends PhyPlanVisitor { + + private int rp; + + private boolean replaced = false; + + public ParallelConstantVisitor(PhysicalPlan plan, int rp) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + this.rp = rp; + } + + public void visitConstant(ConstantExpression cnst) throws VisitorException { + if (cnst.getRequestedParallelism() == -1) { + Object obj = cnst.getValue(); + if (obj instanceof Integer) { + if (replaced) { + // sample job should have only one ConstantExpression + throw new VisitorException("Invalid reduce plan: more " + + "than one ConstantExpression found in sampling job"); + } + cnst.setValue(rp); + cnst.setRequestedParallelism(rp); + replaced = true; + } + } + } + + boolean isReplaced() { return replaced; } + } } Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java?rev=1001797&r1=1001796&r2=1001797&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestJobSubmission.java Mon Sep 27 16:11:29 2010 @@ -607,6 +607,72 @@ public class TestJobSubmission extends j assertEquals(job.getJobConf().getLong("mapred.reduce.tasks",10), 1); } + @Test + public void testReducerNumEstimationForOrderBy() throws Exception{ + // use the estimation + pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100"); + pc.getProperties().setProperty("pig.exec.reducers.max", "10"); + + LogicalPlanTester planTester = new LogicalPlanTester(pc) ; + Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "/passwd"); + planTester.buildPlan("a = load '/passwd';"); + LogicalPlan lp = planTester.buildPlan("b = order a by $0;"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + POStore store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + + MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc); + assertEquals(2, mrPlan.size()); + + MapReduceOper sort = mrPlan.getLeaves().get(0); + long reducer=Math.min((long)Math.ceil(new File("test/org/apache/pig/test/data/passwd").length()/100.0), 10); + assertEquals(reducer, sort.getRequestedParallelism()); + + // use the PARALLEL key word, it will override the estimated reducer number + planTester = new LogicalPlanTester(pc) ; + planTester.buildPlan("a = load '/passwd';"); + lp = planTester.buildPlan("b = order a by $0 PARALLEL 2;"); + pp = Util.buildPhysicalPlan(lp, pc); + store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + + mrPlan = Util.buildMRPlanWithOptimizer(pp, pc); + assertEquals(2, mrPlan.size()); + + sort = mrPlan.getLeaves().get(0); + assertEquals(2, sort.getRequestedParallelism()); + + // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase + planTester = new LogicalPlanTester(pc) ; + planTester.buildPlan("a = load 'hbase://passwd' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');"); + lp = planTester.buildPlan("b = order a by $0 ;"); + pp = Util.buildPhysicalPlan(lp, pc); + store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + + mrPlan = Util.buildMRPlanWithOptimizer(pp, pc); + assertEquals(2, mrPlan.size()); + + sort = mrPlan.getLeaves().get(0); + + assertEquals(1, sort.getRequestedParallelism()); + + // test order by with three jobs (after optimization) + planTester = new LogicalPlanTester(pc) ; + planTester.buildPlan("a = load '/passwd';"); + planTester.buildPlan("b = foreach a generate $0, $1, $2;"); + lp = planTester.buildPlan("c = order b by $0;"); + pp = Util.buildPhysicalPlan(lp, pc); + store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + + mrPlan = Util.buildMRPlanWithOptimizer(pp, pc); + assertEquals(3, mrPlan.size()); + + sort = mrPlan.getLeaves().get(0); + assertEquals(reducer, sort.getRequestedParallelism()); + } + private void submit() throws Exception{ assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc)); MapReduceLauncher mrl = new MapReduceLauncher(); Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java?rev=1001797&r1=1001796&r2=1001797&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/Util.java Mon Sep 27 16:11:29 2010 @@ -52,13 +52,16 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.ExecType; +import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -73,6 +76,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.logicalLayer.parser.QueryParser; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.LogUtils; import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor; import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter; import org.apache.pig.newplan.logical.optimizer.SchemaResetter; @@ -606,6 +610,20 @@ public class Util { return comp.getMRPlan(); } + public static MROperPlan buildMRPlanWithOptimizer(PhysicalPlan pp, PigContext pc) throws Exception { + MapRedUtil.checkLeafIsStore(pp, pc); + + MapReduceLauncher launcher = new MapReduceLauncher(); + + java.lang.reflect.Method compile = launcher.getClass() + .getDeclaredMethod("compile", + new Class[] { PhysicalPlan.class, PigContext.class }); + + compile.setAccessible(true); + + return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc }); + } + public static void registerMultiLineQuery(PigServer pigServer, String query) throws IOException { File f = File.createTempFile("tmp", ""); PrintWriter pw = new PrintWriter(f);