Author: gates
Date: Thu Oct 22 16:23:20 2009
New Revision: 828773
URL: http://svn.apache.org/viewvc?rev=828773&view=rev
Log:
PIG-1025: Add ability to set job priority from Pig Latin script.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 22 16:23:20 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-1025: Add ability to set job priority from Pig Latin script (kevinweil via
+gates)
+
PIG-1028: FINDBUGS: DM_NUMBER_CTOR: Method invokes inefficient Number
constructor; use static valueOf instead (olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Oct 22 16:23:20 2009
@@ -439,6 +439,10 @@
currDAG.setJobName(name);
}
+ public void setJobPriority(String priority){
+ currDAG.setJobPriority(priority);
+ }
+
/**
* Forces execution of query (and all queries from which it reads), in
order to materialize
* result
@@ -903,6 +907,8 @@
private String jobName;
+ private String jobPriority;
+
private boolean batchMode;
private int processedStores;
@@ -934,6 +940,10 @@
List<ExecJob> execute() throws ExecException, FrontendException {
pigContext.getProperties().setProperty(PigContext.JOB_NAME,
jobName);
+ if (jobPriority != null) {
+ pigContext.getProperties().setProperty(PigContext.JOB_PRIORITY,
jobPriority);
+ }
+
List<ExecJob> jobs = PigServer.this.execute(null);
processedStores = storeOpTable.keySet().size();
return jobs;
@@ -947,6 +957,10 @@
jobName = PigContext.JOB_NAME_PREFIX+":"+name;
}
+ public void setJobPriority(String priority){
+ jobPriority = priority;
+ }
+
LogicalPlan getPlan(String alias) throws IOException {
LogicalPlan plan = lp;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Oct 22 16:23:20 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -334,6 +335,25 @@
if (pigContext.getProperties().getProperty(PigContext.JOB_NAME) !=
null)
jobConf.setJobName(pigContext.getProperties().getProperty(PigContext.JOB_NAME));
+ if
(pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY) != null) {
+ // If the job priority was set, attempt to get the
corresponding enum value
+ // and set the hadoop job priority.
+ String jobPriority =
pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY).toUpperCase();
+ try {
+ // Allow arbitrary case; the Hadoop job priorities are all
upper case.
+ jobConf.setJobPriority(JobPriority.valueOf(jobPriority));
+ } catch (IllegalArgumentException e) {
+ StringBuffer sb = new StringBuffer("The job priority must be
one of [");
+ JobPriority[] priorities = JobPriority.values();
+ for (int i = 0; i < priorities.length; ++i) {
+ if (i > 0) sb.append(", ");
+ sb.append(priorities[i]);
+ }
+ sb.append("]. You specified [" + jobPriority + "]");
+ throw new JobCreationException(sb.toString());
+ }
+ }
+
// Setup the DistributedCache for this job
setupDistributedCache(pigContext, jobConf,
pigContext.getProperties(),
"pig.streaming.ship.files", true);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Oct 22
16:23:20 2009
@@ -72,6 +72,7 @@
public static final String JOB_NAME = "jobName";
public static final String JOB_NAME_PREFIX= "PigLatin";
+ public static final String JOB_PRIORITY = "jobPriority";
/* NOTE: we only serialize some of the stuff
*
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Thu Oct 22
16:23:20 2009
@@ -430,6 +430,10 @@
{
mPigServer.setJobName(value);
}
+ else if (key.equals("job.priority"))
+ {
+ mPigServer.setJobPriority(value);
+ }
else if (key.equals("stream.skippath")) {
// Validate
File file = new File(value);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Thu Oct 22
16:23:20 2009
@@ -745,4 +745,18 @@
fail();
}
}
+
+ public void testSetPriority() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "set job.priority high\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
}