Author: daijy
Date: Tue Jul 28 17:09:38 2009
New Revision: 798610
URL: http://svn.apache.org/viewvc?rev=798610&view=rev
Log:
PIG-895: Default parallel for Pig
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jul 28 17:09:38 2009
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-895: Default parallel for Pig (daijy)
+
PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending
BinStorage. Added new Samplable interface for loaders to
implement
allowing them to be used by RandomSampleLoader (ashutoshc via
gates).
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Jul 28 17:09:38 2009
@@ -183,6 +183,10 @@
public void debugOff() {
pigContext.debug = false;
}
+
+ public void setDefaultParallel(int p) {
+ pigContext.defaultParallel = p;
+ }
/**
* Starts batch execution mode.
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Tue Jul 28 17:09:38 2009
@@ -282,6 +282,8 @@
//used as the working directory
String user = System.getProperty("user.name");
jobConf.setUser(user != null ? user : "Pigster");
+ if (pigContext.defaultParallel > 0)
+ jobConf.set("mapred.reduce.tasks", ""+pigContext.defaultParallel);
try{
//Process the POLoads
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Jul 28
17:09:38 2009
@@ -116,6 +116,8 @@
private static ArrayList<String> packageImportList = new
ArrayList<String>();
public boolean debug = true;
+
+ public int defaultParallel = -1;
// Says, wether we're processing an explain right now. Explain
// might skip some check in the logical plan validation (file
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Jul 28
17:09:38 2009
@@ -435,6 +435,14 @@
}
mPigServer.addPathToSkip(value);
}
+ else if (key.equals("default_parallel")) {
+ // Validate
+ try {
+ mPigServer.setDefaultParallel(Integer.parseInt(value));
+ } catch (NumberFormatException e) {
+ throw new ParseException("Invalid value for default_parallel");
+ }
+ }
else
{
// other key-value pairs can go there
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=798610&r1=798609&r2=798610&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Tue Jul 28
17:09:38 2009
@@ -476,6 +476,32 @@
}
}
+ @Test
+ public void testDefaultParallel() throws Throwable {
+ pc.defaultParallel = 100;
+
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("a = load 'input';");
+ LogicalPlan lp = planTester.buildPlan("b = group a by $0;");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ POStore store = GenPhyOp.dummyPigStorageOp();
+ pp.addAsLeaf(store);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ ExecutionEngine exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ Configuration conf =
ConfigurationUtil.toConfiguration(exe.getConfiguration());
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+
+ JobControl jobControl = jcc.compile(mrPlan, "Test");
+ Job job = jobControl.getWaitingJobs().get(0);
+ int parallel = job.getJobConf().getNumReduceTasks();
+
+ assertTrue(parallel==100);
+
+ pc.defaultParallel = -1;
+ }
+
private void submit() throws Exception{
assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
MapReduceLauncher mrl = new MapReduceLauncher();