Author: gates
Date: Thu Oct 22 16:23:20 2009
New Revision: 828773

URL: http://svn.apache.org/viewvc?rev=828773&view=rev
Log:
PIG-1025: Add ability to set job priority from Pig Latin script.

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 22 16:23:20 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-1025: Add ability to set job priority from Pig Latin script (kevinweil via
+gates)
+
 PIG-1028: FINDBUGS: DM_NUMBER_CTOR: Method invokes inefficient Number
 constructor; use static valueOf instead (olgan)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Oct 22 16:23:20 2009
@@ -439,6 +439,10 @@
         currDAG.setJobName(name);
     }
     
+    public void setJobPriority(String priority){
+        currDAG.setJobPriority(priority);
+    }
+
     /**
      * Forces execution of query (and all queries from which it reads), in 
order to materialize
      * result
@@ -903,6 +907,8 @@
 
         private String jobName;
         
+        private String jobPriority;
+
         private boolean batchMode;
 
         private int processedStores;
@@ -934,6 +940,10 @@
         
         List<ExecJob> execute() throws ExecException, FrontendException {
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, 
jobName);
+            if (jobPriority != null) {
+              pigContext.getProperties().setProperty(PigContext.JOB_PRIORITY, 
jobPriority);
+            }
+            
             List<ExecJob> jobs = PigServer.this.execute(null);
             processedStores = storeOpTable.keySet().size();
             return jobs;
@@ -947,6 +957,10 @@
             jobName = PigContext.JOB_NAME_PREFIX+":"+name;
         }
 
+        public void setJobPriority(String priority){
+            jobPriority = priority;
+        }
+
         LogicalPlan getPlan(String alias) throws IOException {
             LogicalPlan plan = lp;
                 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Thu Oct 22 16:23:20 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -334,6 +335,25 @@
             if (pigContext.getProperties().getProperty(PigContext.JOB_NAME) != 
null)
                 
jobConf.setJobName(pigContext.getProperties().getProperty(PigContext.JOB_NAME));
     
+            if 
(pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY) != null) {
+                // If the job priority was set, attempt to get the 
corresponding enum value
+                // and set the hadoop job priority.
+                String jobPriority = 
pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY).toUpperCase();
+                try {
+                  // Allow arbitrary case; the Hadoop job priorities are all 
upper case.
+                  jobConf.setJobPriority(JobPriority.valueOf(jobPriority));
+                } catch (IllegalArgumentException e) {
+                  StringBuffer sb = new StringBuffer("The job priority must be 
one of [");
+                  JobPriority[] priorities = JobPriority.values();
+                  for (int i = 0; i < priorities.length; ++i) {
+                    if (i > 0)  sb.append(", ");
+                    sb.append(priorities[i]);
+                  }
+                  sb.append("].  You specified [" + jobPriority + "]");
+                  throw new JobCreationException(sb.toString());
+                }
+            }
+
             // Setup the DistributedCache for this job
             setupDistributedCache(pigContext, jobConf, 
pigContext.getProperties(), 
                                   "pig.streaming.ship.files", true);

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Oct 22 
16:23:20 2009
@@ -72,6 +72,7 @@
     
     public static final String JOB_NAME = "jobName";
     public static final String JOB_NAME_PREFIX= "PigLatin";
+    public static final String JOB_PRIORITY = "jobPriority";
     
     /* NOTE: we only serialize some of the stuff 
      * 

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Thu Oct 22 
16:23:20 2009
@@ -430,6 +430,10 @@
         {
             mPigServer.setJobName(value);
         }
+        else if (key.equals("job.priority"))
+        {
+            mPigServer.setJobPriority(value);
+        }
         else if (key.equals("stream.skippath")) {
             // Validate
             File file = new File(value);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=828773&r1=828772&r2=828773&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Thu Oct 22 
16:23:20 2009
@@ -745,4 +745,18 @@
             fail();
         }
     }
+   
+    public void testSetPriority() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        PigContext context = server.getPigContext();
+
+        String strCmd = "set job.priority high\n";
+
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        grunt.exec();
+    }
 }


Reply via email to