Author: daijy
Date: Fri Dec 18 04:04:26 2009
New Revision: 892125
URL: http://svn.apache.org/viewvc?rev=892125&view=rev
Log:
PIG-948: [Usability] Relating pig script with MR jobs
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=892125&r1=892124&r2=892125&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Dec 18 04:04:26 2009
@@ -89,6 +89,7 @@
super.reset();
}
+ @SuppressWarnings("deprecation")
@Override
public PigStats launchPig(PhysicalPlan php,
String grpName,
@@ -99,7 +100,7 @@
JobCreationException,
Exception {
long sleepTime = 500;
- long jobidDelayTime = 1000;
+ int MAXRETRY_JOBID = 20;
aggregateWarning =
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
PigStats stats = new PigStats();
@@ -135,35 +136,39 @@
jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
jcThread.start();
- Thread.sleep(jobidDelayTime);
- String jobTrackerAdd;
- String port;
-
- try{
- for (Job job : waitingJobs){
- JobConf jConf = job.getJobConf();
+ for (Job job : waitingJobs){
+ int retry = 0;
+ while (job.getState()==Job.WAITING ||
job.getState()==Job.READY && retry<MAXRETRY_JOBID) {
+ Thread.sleep(sleepTime);
+ retry++;
+ }
+ JobConf jConf = job.getJobConf();
+ if (job.getAssignedJobID()!=null)
+ {
+ log.info("Submitting job: "+job.getAssignedJobID()+" to
execution engine.");
+ }
+ // Try to get job tracker, if we cannot get it, then most
probably we are running in local mode.
+ // If it is the case, we don't print out job tracker location
and hint to kill the job,
+ // because it is meaningless for local mode.
+ try {
+ String jobTrackerAdd;
+ String port;
port = jConf.get("mapred.job.tracker.http.address");
+
port = port.substring(port.indexOf(":"));
jobTrackerAdd =
jConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
jobTrackerAdd =
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"));
- if (job.getAssignedJobID()!=null)
- {
- log.info("Submitting job: "+job.getAssignedJobID()+"
to execution engine.");
- log.info("More information at: http://"+
jobTrackerAdd+port+
-
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
- log.info("To kill this job, use: kill
"+job.getAssignedJobID());
- }
- else
- log.info("Cannot get jobid for this job");
+ log.info("More information at: http://"+
jobTrackerAdd+port+
+ "/jobdetails.jsp?jobid="+job.getAssignedJobID());
+ log.info("To kill this job, use: kill
"+job.getAssignedJobID());
+ }
+ catch(Exception e){
+ /* This is extra information Pig is providing to user.
+ If exception occurs here, job may still complete
successfully.
+ So, pig shouldn't die or even give confusing message to
the user.
+ */
}
}
- catch(Exception e){
- /* This is extra information Pig is providing to user.
- If exception occurs here, job may still complete
successfully.
- So, pig shouldn't die or even give confusing message to the
user.
- So we just log information and move on. */
- log.info("Cannot get jobid for this job");
- }
while(!jc.allFinished()){
try {