Author: hashutosh
Date: Fri Feb 19 22:33:00 2010
New Revision: 912018
URL: http://svn.apache.org/viewvc?rev=912018&view=rev
Log:
PIG-1215: Make Hadoop jobId more prominent in the client log
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 19 22:33:00 2010
@@ -126,6 +126,8 @@
BUG FIXES
+PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc)
+
PIG-1216: New load store design does not allow Pig to validate inputs and
outputs up front (ashutoshc via pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri Feb 19 22:33:00 2010
@@ -257,9 +257,6 @@
// create the context with the parameter
PigContext pigContext = new PigContext(execType, properties);
- // configure logging
- configureLog4J(properties, pigContext);
-
if(logFileName == null && !userSpecifiedLog) {
logFileName =
validateLogFile(properties.getProperty("pig.logfile"), null);
}
@@ -269,6 +266,10 @@
}
pigContext.getProperties().setProperty("pig.logfile", (logFileName ==
null? "": logFileName));
+
+ // configure logging
+ configureLog4J(properties, pigContext);
+
if(optimizerRules.size() > 0) {
pigContext.getProperties().setProperty("pig.optimizer.rules",
ObjectSerializer.serialize(optimizerRules));
@@ -472,26 +473,26 @@
}
}
if (props.size() == 0) {
- props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
props.setProperty("log4j.logger.org.apache.pig", logLevel.toString());
- props.setProperty("log4j.appender.PIGCONSOLE",
- "org.apache.log4j.ConsoleAppender");
- props.setProperty("log4j.appender.PIGCONSOLE.layout",
- "org.apache.log4j.PatternLayout");
- props.setProperty("log4j.appender.PIGCONSOLE.target", "System.err");
-
- if (!brief) {
- // non-brief logging - timestamps
- props.setProperty(
- "log4j.appender.PIGCONSOLE.layout.ConversionPattern",
- "%d [%t] %-5p %c - %m%n");
- } else {
- // brief logging - no timestamps
- props.setProperty(
- "log4j.appender.PIGCONSOLE.layout.ConversionPattern",
- "%m%n");
+ if((logLevelString = System.getProperty("pig.logfile.level")) == null){
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+ }
+ else{
+ logLevel = Level.toLevel(logLevelString, Level.INFO);
+ props.setProperty("log4j.logger.org.apache.pig",
logLevel.toString());
+ props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE, F");
+
props.setProperty("log4j.appender.F","org.apache.log4j.RollingFileAppender");
+
props.setProperty("log4j.appender.F.File",properties.getProperty("pig.logfile"));
+
props.setProperty("log4j.appender.F.layout","org.apache.log4j.PatternLayout");
+ props.setProperty("log4j.appender.F.layout.ConversionPattern",
brief ? "%m%n" : "%d [%t] %-5p %c - %m%n");
}
+
+
props.setProperty("log4j.appender.PIGCONSOLE","org.apache.log4j.ConsoleAppender");
+ props.setProperty("log4j.appender.PIGCONSOLE.target", "System.err");
+
props.setProperty("log4j.appender.PIGCONSOLE.layout","org.apache.log4j.PatternLayout");
+
props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern", brief ?
"%m%n" : "%d [%t] %-5p %c - %m%n");
}
+
PropertyConfigurator.configure(props);
logLevel = Logger.getLogger("org.apache.pig").getLevel();
Properties backendProps = pigContext.getLog4jProperties();
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Feb 19 22:33:00 2010
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
@@ -101,7 +102,6 @@
JobCreationException,
Exception {
long sleepTime = 500;
- int MAXRETRY_JOBID = 20;
aggregateWarning =
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
PigStats stats = new PigStats();
@@ -130,58 +130,66 @@
while((jc = jcc.compile(mrp, grpName)) != null) {
- List<Job> waitingJobs = jc.getWaitingJobs();
+ // Initially, all jobs are in wait state.
+ List<Job> jobsWithoutIds = jc.getWaitingJobs();
+ log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for
submission.");
+
+ String jobTrackerAdd;
+ String port;
+ String jobTrackerLoc;
+ JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
+ try {
+ port = jobConf.get("mapred.job.tracker.http.address");
+ jobTrackerAdd =
jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+ jobTrackerLoc =
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) +
port.substring(port.indexOf(":"));
+ }
+ catch(Exception e){
+ // Could not get the job tracker location, most probably we
are running in local mode.
+ // If it is the case, we don't print out job tracker location,
+ // because it is meaningless for local mode.
+ jobTrackerLoc = null;
+ log.debug("Failed to get job tracker location.");
+ }
+
completeFailedJobsInThisRun.clear();
Thread jcThread = new Thread(jc);
jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+
+ //All the setup done, now lets launch the jobs.
jcThread.start();
-
- for (Job job : waitingJobs){
- int retry = 0;
- while (job.getState()==Job.WAITING ||
job.getState()==Job.READY && retry<MAXRETRY_JOBID) {
- Thread.sleep(sleepTime);
- retry++;
- }
- JobConf jConf = job.getJobConf();
- if (job.getAssignedJobID()!=null)
- {
- log.info("Submitting job: "+job.getAssignedJobID()+" to
execution engine.");
- }
- // Try to get job tracker, if we cannot get it, then most
probably we are running in local mode.
- // If it is the case, we don't print out job tracker location
and hint to kill the job,
- // because it is meaningless for local mode.
- try {
- String jobTrackerAdd;
- String port;
- port = jConf.get("mapred.job.tracker.http.address");
-
- port = port.substring(port.indexOf(":"));
- jobTrackerAdd =
jConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
- jobTrackerAdd =
jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"));
- log.info("More information at: http://"+
jobTrackerAdd+port+
- "/jobdetails.jsp?jobid="+job.getAssignedJobID());
- log.info("To kill this job, use: kill
"+job.getAssignedJobID());
- }
- catch(Exception e){
- /* This is extra information Pig is providing to user.
- If exception occurs here, job may still complete
successfully.
- So, pig shouldn't die or even give confusing message to
the user.
- */
- }
- }
+ // Now wait, till we are finished.
while(!jc.allFinished()){
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- double prog = (numMRJobsCompl+calculateProgress(jc,
jobClient))/totalMRJobs;
- if(prog>=(lastProg+0.01)){
- int perCom = (int)(prog * 100);
- if(perCom!=100)
- log.info( perCom + "% complete");
- }
- lastProg = prog;
+
+ try { Thread.sleep(sleepTime); }
+ catch (InterruptedException e) {}
+
+ List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
+
+ for(Job job : jobsWithoutIds){
+ if (job.getAssignedJobID() != null){
+
+ jobsAssignedIdInThisRun.add(job);
+ log.info("HadoopJobId:
"+job.getAssignedJobID());
+ if(jobTrackerLoc != null){
+ log.info("More information at:
http://"+ jobTrackerLoc+
+
"/jobdetails.jsp?jobid="+job.getAssignedJobID());
+ }
+ }
+ else{
+ // This job is not assigned an id yet.
+ }
+ }
+ jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+
+ double prog = (numMRJobsCompl+calculateProgress(jc,
jobClient))/totalMRJobs;
+ if(prog>=(lastProg+0.01)){
+ int perCom = (int)(prog * 100);
+ if(perCom!=100)
+ log.info( perCom + "% complete");
+ }
+ lastProg = prog;
}
//check for the jobControlException first
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java?rev=912018&r1=912017&r2=912018&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java Fri Feb 19
22:33:00 2010
@@ -24,12 +24,10 @@
import java.io.PrintStream;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigscript.parser.ParseException;
public class LogUtils {