Author: daijy
Date: Tue Jul 28 17:09:38 2009
New Revision: 798610

URL: http://svn.apache.org/viewvc?rev=798610&view=rev
Log:
PIG-895: Default parallel for Pig

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jul 28 17:09:38 2009
@@ -26,6 +26,8 @@
 
 IMPROVEMENTS
 
+PIG-895: Default parallel for Pig (daijy)
+
 PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending
                BinStorage.  Added new Samplable interface for loaders to 
implement
                allowing them to be used by RandomSampleLoader (ashutoshc via 
gates).

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Jul 28 17:09:38 2009
@@ -183,6 +183,10 @@
     public void debugOff() {
         pigContext.debug = false;
     }
+    
+    public void setDefaultParallel(int p) {
+        pigContext.defaultParallel = p;
+    }
  
     /**
      * Starts batch execution mode.

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Tue Jul 28 17:09:38 2009
@@ -282,6 +282,8 @@
         //used as the working directory
         String user = System.getProperty("user.name");
         jobConf.setUser(user != null ? user : "Pigster");
+        if (pigContext.defaultParallel > 0)
+            jobConf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
 
         try{        
             //Process the POLoads

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 28 
17:09:38 2009
@@ -116,6 +116,8 @@
     private static ArrayList<String> packageImportList = new 
ArrayList<String>();
 
     public boolean debug = true;
+    
+    public int defaultParallel = -1;
 
     // Says, wether we're processing an explain right now. Explain
     // might skip some check in the logical plan validation (file

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Jul 28 
17:09:38 2009
@@ -435,6 +435,14 @@
             }
             mPigServer.addPathToSkip(value);
         }
+        else if (key.equals("default_parallel")) {
+            // Validate
+            try {
+                mPigServer.setDefaultParallel(Integer.parseInt(value));
+            } catch (NumberFormatException e) {
+                throw new ParseException("Invalid value for default_parallel");
+            }
+        }
         else
         {
             // other key-value pairs can go there

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Tue Jul 28 
17:09:38 2009
@@ -476,6 +476,32 @@
         }
     }
     
+    @Test
+    public void testDefaultParallel() throws Throwable {
+        pc.defaultParallel = 100;
+        
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("a = load 'input';");
+        LogicalPlan lp = planTester.buildPlan("b = group a by $0;");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        POStore store = GenPhyOp.dummyPigStorageOp();
+        pp.addAsLeaf(store);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        ExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = 
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        JobControl jobControl = jcc.compile(mrPlan, "Test");
+        Job job = jobControl.getWaitingJobs().get(0);
+        int parallel = job.getJobConf().getNumReduceTasks();
+
+        assertTrue(parallel==100);
+        
+        pc.defaultParallel = -1;        
+    }
+
     private void submit() throws Exception{
         assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
         MapReduceLauncher mrl = new MapReduceLauncher();


Reply via email to