Author: daijy
Date: Wed Sep 30 19:07:36 2009
New Revision: 820394
URL: http://svn.apache.org/viewvc?rev=820394&view=rev
Log:
PIG-948: [Usability] Relating pig script with MR jobs
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=820394&r1=820393&r2=820394&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep 30 19:07:36 2009
@@ -30,6 +30,8 @@
PIG-956: 10 minute commit tests (olgan)
+PIG-948: [Usability] Relating pig script with MR jobs (ashutoshc via daijy)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=820394&r1=820393&r2=820394&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Wed Sep 30 19:07:36 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -97,6 +98,7 @@
JobCreationException,
Exception {
long sleepTime = 500;
+ long jobidDelayTime = 1000;
aggregateWarning =
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
PigStats stats = new PigStats();
@@ -124,12 +126,46 @@
JobControlThreadExceptionHandler jctExceptionHandler = new
JobControlThreadExceptionHandler();
while((jc = jcc.compile(mrp, grpName)) != null) {
- numMRJobsCurrent = jc.getWaitingJobs().size();
-
+
+ List<Job> waitingJobs = jc.getWaitingJobs();
+ numMRJobsCurrent = waitingJobs.size();
+
Thread jcThread = new Thread(jc);
jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
jcThread.start();
+ Thread.sleep(jobidDelayTime);
+ String jobTrackerAdd;
+ String port;
+
+ try{
+ for (Job job : waitingJobs){
+ JobConf jConf = job.getJobConf();
+ port = jConf.get("mapred.job.tracker.http.address");
+ port = port.substring(port.indexOf(":"));
+ jobTrackerAdd =
jConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+ jobTrackerAdd =
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"));
+ if (job.getAssignedJobID()!=null)
+ {
+ log.info("Submitting job: "+job.getAssignedJobID()+"
to execution engine.");
+ log.info("More information at: http://"+
jobTrackerAdd+port+
+
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
+ log.info("To kill this job, use: kill
"+job.getAssignedJobID());
+ }
+ else
+ log.info("Cannot get jobid for this job");
+ }
+ }
+ catch(Exception e){
+
+ /* This is extra information Pig is providing to user.
+ If some exception occurs here because of whatever reasons,
job may still complete successfully.
+ So, pig shouldn't die. So we just log the exception and
move on. */
+
+ log.error("Exception occured while trying to retrieve extra
information about job in MapReduceLauncher."
+ +e.getMessage());
+ }
+
while(!jc.allFinished()){
try {
Thread.sleep(sleepTime);