Author: daijy Date: Fri Dec 18 04:02:33 2009 New Revision: 892124 URL: http://svn.apache.org/viewvc?rev=892124&view=rev Log: PIG-1144: set default_parallelism construct does not set the number of reducers correctly
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=892124&r1=892123&r2=892124&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Dec 18 04:02:33 2009 @@ -291,6 +291,9 @@ PIG-1155: Need to make sure existing loaders work "as is" (daijy) +PIG-1144: set default_parallelism construct does not set the number of +reducers correctly (daijy) + Release 0.5.0 INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=892124&r1=892123&r2=892124&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Dec 18 04:02:33 2009 @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.LoadFunc; @@ -2046,10 +2047,13 @@ int val = rp; if(val<=0){ ExecutionEngine eng = pigContext.getExecutionEngine(); - if(eng instanceof HExecutionEngine){ + if(pigContext.getExecType() != ExecType.LOCAL){ try { - val = ((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks(); if(val<=0) + val = pigContext.defaultParallel; + if (val<=0) + val = ((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks(); + if (val<=0) val = 1; } catch (Exception e) { int errCode = 6015; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=892124&r1=892123&r2=892124&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Fri Dec 18 04:02:33 2009 @@ -501,6 +501,69 @@ pc.defaultParallel = -1; } + + @Test + public void testDefaultParallelInSort() throws Throwable { + pc.defaultParallel = 100; + + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan("a = load 'input';"); + LogicalPlan lp = planTester.buildPlan("b = order a by $0;"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + POStore store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + ExecutionEngine exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + JobControlCompiler jcc = new JobControlCompiler(pc, conf); + + // Get the sort job + JobControl jobControl = jcc.compile(mrPlan, "Test"); + jcc.updateMROpPlan(new ArrayList<Job>()); + jobControl = jcc.compile(mrPlan, "Test"); + jcc.updateMROpPlan(new ArrayList<Job>()); + jobControl = jcc.compile(mrPlan, "Test"); + Job job = jobControl.getWaitingJobs().get(0); + int parallel = job.getJobConf().getNumReduceTasks(); + + assertTrue(parallel==100); + + pc.defaultParallel = -1; + } + + @Test + public void testDefaultParallelInSkewJoin() throws Throwable { + pc.defaultParallel = 100; + + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan("a = load 'input';"); + planTester.buildPlan("b = load 'input';"); + LogicalPlan lp = planTester.buildPlan("c = join a by $0, b by $0 using \"skewed\";"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + POStore store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + ExecutionEngine exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + JobControlCompiler jcc = new JobControlCompiler(pc, conf); + + // Get the skew join job + JobControl jobControl = jcc.compile(mrPlan, "Test"); + jcc.updateMROpPlan(new ArrayList<Job>()); + jobControl = jcc.compile(mrPlan, "Test"); + jcc.updateMROpPlan(new ArrayList<Job>()); + jobControl = jcc.compile(mrPlan, "Test"); + Job job = jobControl.getWaitingJobs().get(0); + int parallel = job.getJobConf().getNumReduceTasks(); + + assertTrue(parallel==100); + + pc.defaultParallel = -1; + } private void submit() throws Exception{ assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));