Author: daijy
Date: Fri Dec 18 04:04:26 2009
New Revision: 892125

URL: http://svn.apache.org/viewvc?rev=892125&view=rev
Log:
PIG-948: [Usability] Relating pig script with MR jobs

Modified:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=892125&r1=892124&r2=892125&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Dec 18 04:04:26 2009
@@ -89,6 +89,7 @@
         super.reset();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public PigStats launchPig(PhysicalPlan php,
                               String grpName,
@@ -99,7 +100,7 @@
                                                     JobCreationException,
                                                     Exception {
         long sleepTime = 500;
-        long jobidDelayTime = 1000;
+        int MAXRETRY_JOBID = 20;
         aggregateWarning = 
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
         PigStats stats = new PigStats();
@@ -135,35 +136,39 @@
             jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
             jcThread.start();
 
-            Thread.sleep(jobidDelayTime);
-            String jobTrackerAdd;
-            String port;
-            
-            try{
-                for (Job job : waitingJobs){
-                    JobConf jConf = job.getJobConf();
+            for (Job job : waitingJobs){
+                int retry = 0;
+                while (job.getState()==Job.WAITING || 
job.getState()==Job.READY && retry<MAXRETRY_JOBID) {
+                    Thread.sleep(sleepTime);
+                    retry++;
+                }
+                JobConf jConf = job.getJobConf();
+                if (job.getAssignedJobID()!=null)
+                {
+                    log.info("Submitting job: "+job.getAssignedJobID()+" to 
execution engine.");
+                }
+                // Try to get job tracker, if we cannot get it, then most 
probably we are running in local mode.
+                // If it is the case, we don't print out job tracker location 
and hint to kill the job,
+                // because it is meaningless for local mode.
+                try {
+                    String jobTrackerAdd;
+                    String port;
                     port = jConf.get("mapred.job.tracker.http.address");
+                    
                     port = port.substring(port.indexOf(":"));
                     jobTrackerAdd = 
jConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
                     jobTrackerAdd = 
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"));
-                    if (job.getAssignedJobID()!=null)
-                    {
-                        log.info("Submitting job: "+job.getAssignedJobID()+" 
to execution engine.");
-                        log.info("More information at: http://"+ 
jobTrackerAdd+port+
-                                
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
-                        log.info("To kill this job, use: kill 
"+job.getAssignedJobID());
-                    }
-                    else
-                        log.info("Cannot get jobid for this job");
+                    log.info("More information at: http://"+ 
jobTrackerAdd+port+
+                            "/jobdetails.jsp?jobid="+job.getAssignedJobID());
+                    log.info("To kill this job, use: kill 
"+job.getAssignedJobID());
+                }
+                catch(Exception e){
+                    /* This is extra information Pig is providing to user.
+                       If exception occurs here, job may still complete 
successfully.
+                       So, pig shouldn't die or even give confusing message to 
the user. 
+                    */
                 }
             }
-            catch(Exception e){
-                /* This is extra information Pig is providing to user.
-                   If exception occurs here, job may still complete 
successfully.
-                   So, pig shouldn't die or even give confusing message to the 
user. 
-                   So we just log information and move on. */
-                log.info("Cannot get jobid for this job");
-            }
             
             while(!jc.allFinished()){
                 try {


Reply via email to