Author: daijy Date: Tue Jul 28 17:09:38 2009 New Revision: 798610 URL: http://svn.apache.org/viewvc?rev=798610&view=rev Log: PIG-895: Default parallel for Pig
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Jul 28 17:09:38 2009 @@ -26,6 +26,8 @@ IMPROVEMENTS +PIG-895: Default parallel for Pig (daijy) + PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending BinStorage. Added new Samplable interface for loaders to implement allowing them to be used by RandomSampleLoader (ashutoshc via gates). Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Jul 28 17:09:38 2009 @@ -183,6 +183,10 @@ public void debugOff() { pigContext.debug = false; } + + public void setDefaultParallel(int p) { + pigContext.defaultParallel = p; + } /** * Starts batch execution mode. Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Jul 28 17:09:38 2009 @@ -282,6 +282,8 @@ //used as the working directory String user = System.getProperty("user.name"); jobConf.setUser(user != null ? user : "Pigster"); + if (pigContext.defaultParallel > 0) + jobConf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel); try{ //Process the POLoads Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 28 17:09:38 2009 @@ -116,6 +116,8 @@ private static ArrayList<String> packageImportList = new ArrayList<String>(); public boolean debug = true; + + public int defaultParallel = -1; // Says, wether we're processing an explain right now. Explain // might skip some check in the logical plan validation (file Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Jul 28 17:09:38 2009 @@ -435,6 +435,14 @@ } mPigServer.addPathToSkip(value); } + else if (key.equals("default_parallel")) { + // Validate + try { + mPigServer.setDefaultParallel(Integer.parseInt(value)); + } catch (NumberFormatException e) { + throw new ParseException("Invalid value for default_parallel"); + } + } else { // other key-value pairs can go there Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=798610&r1=798609&r2=798610&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Tue Jul 28 17:09:38 2009 @@ -476,6 +476,32 @@ } } + @Test + public void testDefaultParallel() throws Throwable { + pc.defaultParallel = 100; + + LogicalPlanTester planTester = new LogicalPlanTester() ; + planTester.buildPlan("a = load 'input';"); + LogicalPlan lp = planTester.buildPlan("b = group a by $0;"); + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + POStore store = GenPhyOp.dummyPigStorageOp(); + pp.addAsLeaf(store); + MROperPlan mrPlan = Util.buildMRPlan(pp, pc); + + ExecutionEngine exe = pc.getExecutionEngine(); + ConfigurationValidator.validatePigProperties(exe.getConfiguration()); + Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration()); + JobControlCompiler jcc = new JobControlCompiler(pc, conf); + + JobControl jobControl = jcc.compile(mrPlan, "Test"); + Job job = jobControl.getWaitingJobs().get(0); + int parallel = job.getJobConf().getNumReduceTasks(); + + assertTrue(parallel==100); + + pc.defaultParallel = -1; + } + private void submit() throws Exception{ assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc)); MapReduceLauncher mrl = new MapReduceLauncher();