Author: jing9 Date: Thu Jan 23 19:22:43 2014 New Revision: 1560794 URL: http://svn.apache.org/r1560794 Log: Merging r1559902 through r1560793 from trunk.
Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) Propchange: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1559902-1560793 Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt?rev=1560794&r1=1560793&r2=1560794&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt Thu Jan 23 19:22:43 2014 @@ -285,6 +285,12 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5729. mapred job -list throws NPE (kasha) + MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via + jlowe) + + MAPREDUCE-5723. MR AM container log can be truncated or empty. + (Mohammad Kamrul Islam via kasha) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1559902-1560793 Propchange: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1559902-1560793 Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1560794&r1=1560793&r2=1560794&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Thu Jan 23 19:22:43 2014 @@ -27,6 +27,7 @@ import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.Applic import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.log4j.LogManager; /** * The main() for MapReduce task processes. @@ -123,6 +123,7 @@ class YarnChild { LOG.debug("PID: " + System.getenv().get("JVM_PID")); Task task = null; UserGroupInformation childUGI = null; + ScheduledExecutorService logSyncer = null; try { int idleLoopCount = 0; @@ -161,6 +162,8 @@ class YarnChild { // set job classloader if configured before invoking the task MRApps.setJobClassLoader(job); + logSyncer = TaskLog.createLogSyncer(); + // Create a final reference to the task for the doAs block final Task taskFinal = task; childUGI.doAs(new PrivilegedExceptionAction<Object>() { @@ -214,10 +217,7 @@ class YarnChild { } finally { RPC.stopProxy(umbilical); DefaultMetricsSystem.shutdown(); - // Shutting down log4j of the child-vm... - // This assumes that on return from Task.run() - // there is no more logging done. - LogManager.shutdown(); + TaskLog.syncLogsShutdown(logSyncer); } } Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1560794&r1=1560793&r2=1560794&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Jan 23 19:22:43 2014 @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; @@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutp import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -119,6 +121,7 @@ import org.apache.hadoop.service.Abstrac import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringInterner; @@ -212,6 +215,7 @@ public class MRAppMaster extends Composi boolean errorHappenedShutDown = false; private String shutDownMessage = null; JobStateInternal forcedState = null; + private final ScheduledExecutorService logSyncer; private long recoveredJobStartTime = 0; @@ -240,6 +244,7 @@ public class MRAppMaster extends Composi this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); this.maxAppAttempts = maxAppAttempts; + logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -1078,6 +1083,12 @@ public class MRAppMaster extends Composi // All components have started, start the job. startJobs(); } + + @Override + public void stop() { + super.stop(); + TaskLog.syncLogsShutdown(logSyncer); + } private void processRecovery() { if (appAttemptID.getAttemptId() == 1) { @@ -1395,9 +1406,7 @@ public class MRAppMaster extends Composi initAndStartAppMaster(appMaster, conf, jobUserName); } catch (Throwable t) { LOG.fatal("Error starting MRAppMaster", t); - System.exit(1); - } finally { - LogManager.shutdown(); + ExitUtil.terminate(1, t); } } @@ -1473,4 +1482,11 @@ public class MRAppMaster extends Composi } }); } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + LogManager.shutdown(); + } + } Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1560794&r1=1560793&r2=1560794&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu Jan 23 19:22:43 2014 @@ -23,12 +23,17 @@ import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtil import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.util.ProcessTree; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; @@ -262,7 +269,86 @@ public class TaskLog { } writeToIndexFile(logLocation, isCleanup); } - + + public static synchronized void syncLogsShutdown( + ScheduledExecutorService scheduler) + { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + if (scheduler != null) { + scheduler.shutdownNow(); + } + + // flush & close all appenders + LogManager.shutdown(); + } + + @SuppressWarnings("unchecked") + public static synchronized void syncLogs() { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + // flush flushable appenders + // + final Logger rootLogger = Logger.getRootLogger(); + flushAppenders(rootLogger); + final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository(). + getCurrentLoggers(); + while (allLoggers.hasMoreElements()) { + final Logger l = allLoggers.nextElement(); + flushAppenders(l); + } + } + + @SuppressWarnings("unchecked") + private static void flushAppenders(Logger l) { + final Enumeration<Appender> allAppenders = l.getAllAppenders(); + while (allAppenders.hasMoreElements()) { + final Appender a = allAppenders.nextElement(); + if (a instanceof Flushable) { + try { + ((Flushable) a).flush(); + } catch (IOException ioe) { + System.err.println(a + ": Failed to flush!" + + StringUtils.stringifyException(ioe)); + } + } + } + } + + public static ScheduledExecutorService createLogSyncer() { + final ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Thread for syncLogs"); + return t; + } + }); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + TaskLog.syncLogsShutdown(scheduler); + } + }, 50); + scheduler.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + TaskLog.syncLogs(); + } + }, 0L, 5L, TimeUnit.SECONDS); + return scheduler; + } + /** * The filter for userlogs. */ Modified: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=1560794&r1=1560793&r2=1560794&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java Thu Jan 23 19:22:43 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.io.Flushable; import java.util.LinkedList; import java.util.Queue; @@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent * */ @InterfaceStability.Unstable -public class TaskLogAppender extends FileAppender { +public class TaskLogAppender extends FileAppender implements Flushable { private String taskId; //taskId should be managed as String rather than TaskID object //so that log4j can configure it from the configuration(log4j.properties). private Integer maxEvents; @@ -92,6 +93,7 @@ public class TaskLogAppender extends Fil } } + @Override public void flush() { if (qw != null) { qw.flush(); Propchange: hadoop/common/branches/HDFS-5698/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1559902-1560793