Author: jlowe Date: Thu Jan 16 23:13:17 2014 New Revision: 1558957 URL: http://svn.apache.org/r1558957 Log: svn merge -c 1558948 FIXES: MAPREDUCE-5672. Provide optional RollingFileAppender for container log4j (syslog). Contributed by Gera Shegalov
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Jan 16 23:13:17 2014 @@ -129,6 +129,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running. (tucu) + MAPREDUCE-5672. Provide optional RollingFileAppender for container log4j + (syslog) (Gera Shegalov via jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Thu Jan 16 23:13:17 2014 @@ -149,8 +149,10 @@ public class MapReduceChildJVM { private static void setupLog4jProperties(Task task, Vector<String> vargs, long logSize) { - String logLevel = getChildLogLevel(task.conf, task.isMapTask()); - MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); + String logLevel = getChildLogLevel(task.conf, task.isMapTask()); + int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS, + MRJobConfig.DEFAULT_TASK_LOG_BACKUPS); + MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs); } public static List<String> getVMCommand( Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Jan 16 23:13:17 2014 @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.log4j.RollingFileAppender; /** * Helper class for MR applications @@ -490,15 +491,23 @@ public class MRApps extends Apps { * Add the JVM system properties necessary to configure {@link ContainerLogAppender}. * @param logLevel the desired log level (eg INFO/WARN/DEBUG) * @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)} + * @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)} * @param vargs the argument list to append to */ public static void addLog4jSystemProperties( - String logLevel, long logSize, List<String> vargs) { + String logLevel, long logSize, int numBackups, List<String> vargs) { vargs.add("-Dlog4j.configuration=container-log4j.properties"); vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); vargs.add( "-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize); - vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); + if (logSize > 0L && numBackups > 0) { + // log should be rolled + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "=" + + numBackups); + vargs.add("-Dhadoop.root.logger=" + logLevel + ",CRLA"); + } else { + vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); + } } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Jan 16 23:13:17 2014 @@ -407,6 +407,10 @@ public interface MRJobConfig { MR_AM_PREFIX+"log.level"; public static final String DEFAULT_MR_AM_LOG_LEVEL = "INFO"; + public static final String MR_AM_LOG_BACKUPS = + MR_AM_PREFIX + "container.log.backups"; + public static final int DEFAULT_MR_AM_LOG_BACKUPS = 0; // don't roll + /**The number of splits when reporting progress in MR*/ public static final String MR_AM_NUM_PROGRESS_SPLITS = MR_AM_PREFIX+"num-progress-splits"; @@ -681,7 +685,11 @@ public interface MRJobConfig { + "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*"; public static final String WORKFLOW_ID = "mapreduce.workflow.id"; - + + public static final String TASK_LOG_BACKUPS = + MR_PREFIX + "task.container.log.backups"; + public static final int DEFAULT_TASK_LOG_BACKUPS = 0; // don't roll + public static final String WORKFLOW_NAME = "mapreduce.workflow.name"; public static final String WORKFLOW_NODE_NAME = Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jan 16 23:13:17 2014 @@ -816,6 +816,31 @@ </property> <property> + <name>yarn.app.mapreduce.task.container.log.backups</name> + <value>0</value> + <description>Number of backup files for task logs when using + ContainerRollingLogAppender (CRLA). See + org.apache.log4j.RollingFileAppender.maxBackupIndex. By default, + ContainerLogAppender (CLA) is used, and container logs are not rolled. CRLA + is enabled for tasks when both mapreduce.task.userlog.limit.kb and + yarn.app.mapreduce.task.container.log.backups are greater than zero. + </description> +</property> + +<property> + <name>yarn.app.mapreduce.am.container.log.backups</name> + <value>0</value> + <description>Number of backup files for the ApplicationMaster logs when using + ContainerRollingLogAppender (CRLA). See + org.apache.log4j.RollingFileAppender.maxBackupIndex. By default, + ContainerLogAppender (CLA) is used, and container logs are not rolled. CRLA + is enabled for the ApplicationMaster when both + mapreduce.task.userlog.limit.kb and + yarn.app.mapreduce.am.container.log.backups are greater than zero. + </description> +</property> + +<property> <name>mapreduce.job.userlog.retain.hours</name> <value>24</value> <description>The maximum time, in hours, for which the user-logs are to be Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Jan 16 23:13:17 2014 @@ -392,7 +392,9 @@ public class YARNRunner implements Clien long logSize = TaskLog.getTaskLogLength(new JobConf(conf)); String logLevel = jobConf.get( MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); - MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); + int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, + MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS); + MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs); // Check for Java Lib Path usage in MAP and REDUCE configs warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1558957&r1=1558956&r2=1558957&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Jan 16 23:13:17 2014 @@ -23,10 +23,12 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; +import java.io.InputStreamReader; import java.io.IOException; import java.io.StringReader; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.jar.JarOutputStream; @@ -53,6 +55,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobCounter; @@ -65,17 +69,22 @@ import org.apache.hadoop.mapreduce.TaskC import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -84,6 +93,9 @@ import org.junit.Test; public class TestMRJobs { private static final Log LOG = LogFactory.getLog(TestMRJobs.class); + private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); + private static final int NUM_NODE_MGRS = 3; protected static MiniMRYarnCluster mrCluster; protected static MiniDFSCluster dfsCluster; @@ -122,7 +134,8 @@ public class TestMRJobs { } if (mrCluster == null) { - mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3); + mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), + NUM_NODE_MGRS); Configuration conf = new Configuration(); conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); @@ -416,6 +429,115 @@ public class TestMRJobs { // TODO later: add explicit "isUber()" checks of some sort } + @Test(timeout = 120000) + public void testContainerRollingLog() throws IOException, + InterruptedException, ClassNotFoundException { + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + final SleepJob sleepJob = new SleepJob(); + final JobConf sleepConf = new JobConf(mrCluster.getConfig()); + sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); + sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); + sleepConf.setLong(MRJobConfig.TASK_USERLOG_LIMIT, 1); + sleepConf.setInt(MRJobConfig.TASK_LOG_BACKUPS, 3); + sleepConf.setInt(MRJobConfig.MR_AM_LOG_BACKUPS, 7); + sleepJob.setConf(sleepConf); + + final Job job = sleepJob.createJob(1, 0, 1L, 100, 0L, 0); + job.setJarByClass(SleepJob.class); + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.waitForCompletion(true); + final JobId jobId = TypeConverter.toYarn(job.getJobID()); + final ApplicationId appID = jobId.getAppId(); + int pollElapsed = 0; + while (true) { + Thread.sleep(1000); + pollElapsed += 1000; + if (TERMINAL_RM_APP_STATES.contains( + mrCluster.getResourceManager().getRMContext().getRMApps().get(appID) + .getState())) { + break; + } + if (pollElapsed >= 60000) { + LOG.warn("application did not reach terminal state within 60 seconds"); + break; + } + } + Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager() + .getRMContext().getRMApps().get(appID).getState()); + + // Job finished, verify logs + // + + final String appIdStr = appID.toString(); + final String appIdSuffix = appIdStr.substring("application_".length(), + appIdStr.length()); + final String containerGlob = "container_" + appIdSuffix + "_*_*"; + final String syslogGlob = appIdStr + + Path.SEPARATOR + containerGlob + + Path.SEPARATOR + TaskLog.LogName.SYSLOG; + int numAppMasters = 0; + int numMapTasks = 0; + + for (int i = 0; i < NUM_NODE_MGRS; i++) { + final Configuration nmConf = mrCluster.getNodeManager(i).getConfig(); + for (String logDir : + nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) { + final Path absSyslogGlob = + new Path(logDir + Path.SEPARATOR + syslogGlob); + LOG.info("Checking for glob: " + absSyslogGlob); + final FileStatus[] syslogs = localFs.globStatus(absSyslogGlob); + for (FileStatus slog : syslogs) { + // check all syslogs for the container + // + final FileStatus[] sysSiblings = localFs.globStatus(new Path( + slog.getPath().getParent(), TaskLog.LogName.SYSLOG + "*")); + boolean foundAppMaster = false; + floop: + for (FileStatus f : sysSiblings) { + final BufferedReader reader = new BufferedReader( + new InputStreamReader(localFs.open(f.getPath()))); + String line; + try { + while ((line = reader.readLine()) != null) { + if (line.contains(MRJobConfig.APPLICATION_MASTER_CLASS)) { + foundAppMaster = true; + break floop; + } + } + } finally { + reader.close(); + } + } + + if (foundAppMaster) { + numAppMasters++; + } else { + numMapTasks++; + } + + Assert.assertSame("Number of sylog* files", + foundAppMaster + ? sleepConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, 0) + 1 + : sleepConf.getInt(MRJobConfig.TASK_LOG_BACKUPS, 0) + 1, + sysSiblings.length); + } + } + } + // Make sure we checked non-empty set + // + Assert.assertEquals("No AppMaster log found!", 1, numAppMasters); + if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) { + Assert.assertEquals("MapTask log with uber found!", 0, numMapTasks); + } else { + Assert.assertEquals("No MapTask log found!", 1, numMapTasks); + } + } + public static class DistributedCacheChecker extends Mapper<LongWritable, Text, NullWritable, NullWritable> {