Author: daijy
Date: Fri Dec 18 04:07:27 2009
New Revision: 892126
URL: http://svn.apache.org/viewvc?rev=892126&view=rev
Log:
PIG-1144: set default_parallelism construct does not set the number of reducers
correctly
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Fri Dec 18 04:07:27 2009
@@ -241,6 +241,9 @@
PIG-1155: Need to make sure existing loaders work "as is" (daijy)
+PIG-1144: set default_parallelism construct does not set the number of
+reducers correctly (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Dec 18 04:07:27 2009
@@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
@@ -2046,10 +2047,13 @@
int val = rp;
if(val<=0){
ExecutionEngine eng = pigContext.getExecutionEngine();
- if(eng instanceof HExecutionEngine){
+ if(pigContext.getExecType() != ExecType.LOCAL){
try {
- val =
((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
if(val<=0)
+ val = pigContext.defaultParallel;
+ if (val<=0)
+ val =
((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
+ if (val<=0)
val = 1;
} catch (Exception e) {
int errCode = 6015;
Modified:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java
(original)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java
Fri Dec 18 04:07:27 2009
@@ -501,6 +501,69 @@
pc.defaultParallel = -1;
}
+
+ @Test
+ public void testDefaultParallelInSort() throws Throwable {
+ pc.defaultParallel = 100;
+
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("a = load 'input';");
+ LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ POStore store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ ExecutionEngine exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ Configuration conf =
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+
+ // Get the sort job
+ JobControl jobControl = jcc.compile(mrPlan, "Test");
+ jcc.updateMROpPlan(new ArrayList<Job>());
+ jobControl = jcc.compile(mrPlan, "Test");
+ jcc.updateMROpPlan(new ArrayList<Job>());
+ jobControl = jcc.compile(mrPlan, "Test");
+ Job job = jobControl.getWaitingJobs().get(0);
+ int parallel = job.getJobConf().getNumReduceTasks();
+
+ assertTrue(parallel==100);
+
+ pc.defaultParallel = -1;
+ }
+
+ @Test
+ public void testDefaultParallelInSkewJoin() throws Throwable {
+ pc.defaultParallel = 100;
+
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("a = load 'input';");
+ planTester.buildPlan("b = load 'input';");
+ LogicalPlan lp = planTester.buildPlan("c = join a by $0, b by $0 using
\"skewed\";");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ POStore store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ ExecutionEngine exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ Configuration conf =
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+
+ // Get the skew join job
+ JobControl jobControl = jcc.compile(mrPlan, "Test");
+ jcc.updateMROpPlan(new ArrayList<Job>());
+ jobControl = jcc.compile(mrPlan, "Test");
+ jcc.updateMROpPlan(new ArrayList<Job>());
+ jobControl = jcc.compile(mrPlan, "Test");
+ Job job = jobControl.getWaitingJobs().get(0);
+ int parallel = job.getJobConf().getNumReduceTasks();
+
+ assertTrue(parallel==100);
+
+ pc.defaultParallel = -1;
+ }
private void submit() throws Exception{
assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));