Author: daijy
Date: Fri Dec 18 04:07:27 2009
New Revision: 892126

URL: http://svn.apache.org/viewvc?rev=892126&view=rev
Log:
PIG-1144: set default_parallelism construct does not set the number of reducers 
correctly

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Fri Dec 18 04:07:27 2009
@@ -241,6 +241,9 @@
 
 PIG-1155: Need to make sure existing loaders work "as is" (daijy)
 
+PIG-1144: set default_parallelism construct does not set the number of
+reducers correctly (daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Dec 18 04:07:27 2009
@@ -31,6 +31,7 @@
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
@@ -2046,10 +2047,13 @@
         int val = rp;
         if(val<=0){
             ExecutionEngine eng = pigContext.getExecutionEngine();
-            if(eng instanceof HExecutionEngine){
+            if(pigContext.getExecType() != ExecType.LOCAL){
                 try {
-                    val = 
((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
                     if(val<=0)
+                        val = pigContext.defaultParallel;
+                    if (val<=0)
+                        val = 
((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
+                    if (val<=0)
                         val = 1;
                 } catch (Exception e) {
                     int errCode = 6015;

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java?rev=892126&r1=892125&r2=892126&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java 
(original)
+++ 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestJobSubmission.java 
Fri Dec 18 04:07:27 2009
@@ -501,6 +501,69 @@
         
         pc.defaultParallel = -1;        
     }
+    
+    @Test
+    public void testDefaultParallelInSort() throws Throwable {
+        pc.defaultParallel = 100;
+        
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input';");
+        LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        ExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        // Get the sort job
+        JobControl jobControl = jcc.compile(mrPlan, "Test");
+        jcc.updateMROpPlan(new ArrayList<Job>());
+        jobControl = jcc.compile(mrPlan, "Test");
+        jcc.updateMROpPlan(new ArrayList<Job>());
+        jobControl = jcc.compile(mrPlan, "Test");
+        Job job = jobControl.getWaitingJobs().get(0);
+        int parallel = job.getJobConf().getNumReduceTasks();
+
+        assertTrue(parallel==100);
+        
+        pc.defaultParallel = -1;        
+    }
+    
+    @Test
+    public void testDefaultParallelInSkewJoin() throws Throwable {
+        pc.defaultParallel = 100;
+        
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input';");
+        planTester.buildPlan("b = load 'input';");
+        LogicalPlan lp = planTester.buildPlan("c = join a by $0, b by $0 using 
\"skewed\";");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        ExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        // Get the skew join job
+        JobControl jobControl = jcc.compile(mrPlan, "Test");
+        jcc.updateMROpPlan(new ArrayList<Job>());
+        jobControl = jcc.compile(mrPlan, "Test");
+        jcc.updateMROpPlan(new ArrayList<Job>());
+        jobControl = jcc.compile(mrPlan, "Test");
+        Job job = jobControl.getWaitingJobs().get(0);
+        int parallel = job.getJobConf().getNumReduceTasks();
+
+        assertTrue(parallel==100);
+        
+        pc.defaultParallel = -1;        
+    }
 
     private void submit() throws Exception{
         assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));


Reply via email to