Author: hashutosh
Date: Fri Feb 19 22:33:00 2010
New Revision: 912018

URL: http://svn.apache.org/viewvc?rev=912018&view=rev
Log:
PIG-1215: Make Hadoop jobId more prominent in the client log

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 19 22:33:00 2010
@@ -126,6 +126,8 @@
 
 BUG FIXES
 
+PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc)
+
 PIG-1216: New load store design does not allow Pig to validate inputs and
 outputs up front (ashutoshc via pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri Feb 19 22:33:00 2010
@@ -257,9 +257,6 @@
         // create the context with the parameter
         PigContext pigContext = new PigContext(execType, properties);
 
-        // configure logging
-        configureLog4J(properties, pigContext);
-        
         if(logFileName == null && !userSpecifiedLog) {
            logFileName = 
validateLogFile(properties.getProperty("pig.logfile"), null);
        }
@@ -269,6 +266,10 @@
         }
         
         pigContext.getProperties().setProperty("pig.logfile", (logFileName == 
null? "": logFileName));
+     
+        // configure logging
+        configureLog4J(properties, pigContext);
+        
         
         if(optimizerRules.size() > 0) {
                pigContext.getProperties().setProperty("pig.optimizer.rules", 
ObjectSerializer.serialize(optimizerRules));
@@ -472,26 +473,26 @@
         }
     }
     if (props.size() == 0) {
-        props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
         props.setProperty("log4j.logger.org.apache.pig", logLevel.toString());
-        props.setProperty("log4j.appender.PIGCONSOLE",
-                "org.apache.log4j.ConsoleAppender");
-        props.setProperty("log4j.appender.PIGCONSOLE.layout",
-                "org.apache.log4j.PatternLayout");
-        props.setProperty("log4j.appender.PIGCONSOLE.target", "System.err");
-
-        if (!brief) {
-            // non-brief logging - timestamps
-            props.setProperty(
-                    "log4j.appender.PIGCONSOLE.layout.ConversionPattern",
-                    "%d [%t] %-5p %c - %m%n");
-        } else {
-            // brief logging - no timestamps
-            props.setProperty(
-                    "log4j.appender.PIGCONSOLE.layout.ConversionPattern",
-                    "%m%n");
+        if((logLevelString = System.getProperty("pig.logfile.level")) == null){
+            props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+        }
+        else{
+            logLevel = Level.toLevel(logLevelString, Level.INFO);
+            props.setProperty("log4j.logger.org.apache.pig", 
logLevel.toString());
+            props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE, F");
+            
props.setProperty("log4j.appender.F","org.apache.log4j.RollingFileAppender");
+            
props.setProperty("log4j.appender.F.File",properties.getProperty("pig.logfile"));
+            
props.setProperty("log4j.appender.F.layout","org.apache.log4j.PatternLayout");
+            props.setProperty("log4j.appender.F.layout.ConversionPattern", 
brief ? "%m%n" : "%d [%t] %-5p %c - %m%n");
         }
+        
+        
props.setProperty("log4j.appender.PIGCONSOLE","org.apache.log4j.ConsoleAppender");
    
+        props.setProperty("log4j.appender.PIGCONSOLE.target", "System.err");   
     
+        
props.setProperty("log4j.appender.PIGCONSOLE.layout","org.apache.log4j.PatternLayout");
+        
props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern", brief ? 
"%m%n" : "%d [%t] %-5p %c - %m%n");
     }
+
     PropertyConfigurator.configure(props);
     logLevel = Logger.getLogger("org.apache.pig").getLevel();
     Properties backendProps = pigContext.getLog4jProperties();

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Feb 19 22:33:00 2010
@@ -20,6 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.LinkedList;
@@ -101,7 +102,6 @@
                                                     JobCreationException,
                                                     Exception {
         long sleepTime = 500;
-        int MAXRETRY_JOBID = 20;
         aggregateWarning = 
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
         PigStats stats = new PigStats();
@@ -130,58 +130,66 @@
 
         while((jc = jcc.compile(mrp, grpName)) != null) {
             
-            List<Job> waitingJobs = jc.getWaitingJobs();
+               // Initially, all jobs are in wait state.
+            List<Job> jobsWithoutIds = jc.getWaitingJobs();
+            log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for 
submission.");
+            
+            String jobTrackerAdd;
+            String port;
+            String jobTrackerLoc;
+            JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
+            try {
+                port = jobConf.get("mapred.job.tracker.http.address");
+                jobTrackerAdd = 
jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+                jobTrackerLoc = 
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) + 
port.substring(port.indexOf(":"));
+            }
+            catch(Exception e){
+                // Could not get the job tracker location, most probably we 
are running in local mode.
+                // If it is the case, we don't print out job tracker location,
+                // because it is meaningless for local mode.
+               jobTrackerLoc = null;
+                log.debug("Failed to get job tracker location.");
+            }
+            
             completeFailedJobsInThisRun.clear();
             
             Thread jcThread = new Thread(jc);
             jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+            
+            //All the setup done, now lets launch the jobs.
             jcThread.start();
-
-            for (Job job : waitingJobs){
-                int retry = 0;
-                while (job.getState()==Job.WAITING || 
job.getState()==Job.READY && retry<MAXRETRY_JOBID) {
-                    Thread.sleep(sleepTime);
-                    retry++;
-                }
-                JobConf jConf = job.getJobConf();
-                if (job.getAssignedJobID()!=null)
-                {
-                    log.info("Submitting job: "+job.getAssignedJobID()+" to 
execution engine.");
-                }
-                // Try to get job tracker, if we cannot get it, then most 
probably we are running in local mode.
-                // If it is the case, we don't print out job tracker location 
and hint to kill the job,
-                // because it is meaningless for local mode.
-                try {
-                    String jobTrackerAdd;
-                    String port;
-                    port = jConf.get("mapred.job.tracker.http.address");
-                    
-                    port = port.substring(port.indexOf(":"));
-                    jobTrackerAdd = 
jConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
-                    jobTrackerAdd = 
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"));
-                    log.info("More information at: http://"+ 
jobTrackerAdd+port+
-                            "/jobdetails.jsp?jobid="+job.getAssignedJobID());
-                    log.info("To kill this job, use: kill 
"+job.getAssignedJobID());
-                }
-                catch(Exception e){
-                    /* This is extra information Pig is providing to user.
-                       If exception occurs here, job may still complete 
successfully.
-                       So, pig shouldn't die or even give confusing message to 
the user. 
-                    */
-                }
-            }
             
+            // Now wait, till we are finished.
             while(!jc.allFinished()){
-                try {
-                    Thread.sleep(sleepTime);
-                } catch (InterruptedException e) {}
-                double prog = (numMRJobsCompl+calculateProgress(jc, 
jobClient))/totalMRJobs;
-                if(prog>=(lastProg+0.01)){
-                    int perCom = (int)(prog * 100);
-                    if(perCom!=100)
-                        log.info( perCom + "% complete");
-                }
-                lastProg = prog;
+
+               try { Thread.sleep(sleepTime); } 
+               catch (InterruptedException e) {}
+               
+               List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
+
+               for(Job job : jobsWithoutIds){
+                       if (job.getAssignedJobID() != null){
+
+                               jobsAssignedIdInThisRun.add(job);
+                               log.info("HadoopJobId: 
"+job.getAssignedJobID());
+                               if(jobTrackerLoc != null){
+                                       log.info("More information at: 
http://"+ jobTrackerLoc+
+                                                       
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
+                               }  
+                       }
+                       else{
+                               // This job is not assigned an id yet.
+                       }
+               }
+               jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+
+               double prog = (numMRJobsCompl+calculateProgress(jc, 
jobClient))/totalMRJobs;
+               if(prog>=(lastProg+0.01)){
+                       int perCom = (int)(prog * 100);
+                       if(perCom!=100)
+                               log.info( perCom + "% complete");
+               }
+               lastProg = prog;
             }
 
             //check for the jobControlException first

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java Fri Feb 19 
22:33:00 2010
@@ -24,12 +24,10 @@
 import java.io.PrintStream;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigscript.parser.ParseException;
 
 public class LogUtils {


Reply via email to