Author: vinodkv Date: Mon Oct 24 08:41:48 2011 New Revision: 1188043 URL: http://svn.apache.org/viewvc?rev=1188043&view=rev Log: MAPREDUCE-2708. Designed and implemented MR Application Master recovery to make MR AMs resume their progress after restart. Contributed by Sharad Agarwal.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Oct 24 08:41:48 2011 @@ -115,6 +115,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3104. Implemented Application-acls. (vinodkv) + MAPREDUCE-2708. Designed and implemented MR Application Master recovery to + make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv) + IMPROVEMENTS MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Mon Oct 24 08:41:48 2011 @@ -29,11 +29,9 @@ import org.apache.hadoop.mapred.TaskLog. import org.apache.hadoop.mapreduce.ID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.util.MRApps; -import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Apps; public class MapReduceChildJVM { @@ -131,6 +129,8 @@ public class MapReduceChildJVM { MRJobConfig.STDERR_LOGFILE_ENV, getTaskLogFile(TaskLog.LogName.STDERR) ); + environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV, + conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString()); } private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Mon Oct 24 08:41:48 2011 @@ -239,6 +239,14 @@ class YarnChild { Token<JobTokenIdentifier> jt) throws IOException { final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); job.setCredentials(credentials); + + String appAttemptIdEnv = System + .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV); + LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv); + // Set it in conf, so as to be able to be used the the OutputCommitter. + job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer + .parseInt(appAttemptIdEnv)); + // set tcp nodelay job.setBoolean("ipc.client.tcpnodelay", true); job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Oct 24 08:41:48 2011 @@ -36,18 +36,25 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; -import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -76,12 +83,14 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.YarnException; @@ -93,6 +102,8 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; @@ -121,6 +132,9 @@ public class MRAppMaster extends Composi private static final Log LOG = LogFactory.getLog(MRAppMaster.class); + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + private Clock clock; private final long startTime; private final long appSubmitTime; @@ -143,6 +157,9 @@ public class MRAppMaster extends Composi private TaskAttemptListener taskAttemptListener; private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); + private JobId jobId; + private boolean newApiCommitter; + private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private boolean inRecovery = false; @@ -182,15 +199,39 @@ public class MRAppMaster extends Composi // for an app later appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>"); - if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false) - && appAttemptID.getAttemptId() > 1) { - LOG.info("Recovery is enabled. Will try to recover from previous life."); - recoveryServ = new RecoveryService(appAttemptID, clock); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId()); + + newApiCommitter = false; + jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), + appAttemptID.getApplicationId().getId()); + int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0); + if ((numReduceTasks > 0 && + conf.getBoolean("mapred.reducer.new-api", false)) || + (numReduceTasks == 0 && + conf.getBoolean("mapred.mapper.new-api", false))) { + newApiCommitter = true; + LOG.info("Using mapred newApiCommitter."); + } + + committer = createOutputCommitter(conf); + boolean recoveryEnabled = conf.getBoolean( + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + boolean recoverySupportedByCommitter = committer.isRecoverySupported(); + if (recoveryEnabled && recoverySupportedByCommitter + && appAttemptID.getAttemptId() > 1) { + LOG.info("Recovery is enabled. " + + "Will try to recover from previous life on best effort basis."); + recoveryServ = new RecoveryService(appAttemptID, clock, + committer); addIfService(recoveryServ); dispatcher = recoveryServ.getDispatcher(); clock = recoveryServ.getClock(); inRecovery = true; } else { + LOG.info("Not starting RecoveryService: recoveryEnabled: " + + recoveryEnabled + " recoverySupportedByCommitter: " + + recoverySupportedByCommitter + " ApplicationAttemptID: " + + appAttemptID.getAttemptId()); dispatcher = new AsyncDispatcher(); addIfService(dispatcher); } @@ -253,7 +294,36 @@ public class MRAppMaster extends Composi super.init(conf); } // end of init() - + private OutputCommitter createOutputCommitter(Configuration conf) { + OutputCommitter committer = null; + + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); + + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils + .newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils + .newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; + } + protected boolean keepJobFiles(JobConf conf) { return (conf.getKeepTaskFilesPattern() != null || conf .getKeepFailedTaskFiles()); @@ -348,10 +418,10 @@ public class MRAppMaster extends Composi protected Job createJob(Configuration conf) { // create single job - Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(), - taskAttemptListener, jobTokenSecretManager, fsTokens, clock, - completedTasksFromPreviousRun, metrics, currentUser.getUserName(), - appSubmitTime, amInfos); + Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher + .getEventHandler(), taskAttemptListener, jobTokenSecretManager, + fsTokens, clock, completedTasksFromPreviousRun, metrics, committer, + newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, @@ -471,6 +541,22 @@ public class MRAppMaster extends Composi return appAttemptID.getApplicationId(); } + public ApplicationAttemptId getAttemptID() { + return appAttemptID; + } + + public JobId getJobId() { + return jobId; + } + + public OutputCommitter getCommitter() { + return committer; + } + + public boolean isNewApiCommitter() { + return newApiCommitter; + } + public int getStartCount() { return appAttemptID.getAttemptId(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Oct 24 08:41:48 2011 @@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -64,7 +61,6 @@ import org.apache.hadoop.mapreduce.secur import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.v2.api.records.Counters; @@ -98,14 +94,11 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -126,15 +119,13 @@ public class JobImpl implements org.apac // Maximum no. of fetch-failure notifications after which map task is failed private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; - - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); //final fields private final ApplicationAttemptId applicationAttemptId; private final Clock clock; private final JobACLsManager aclsManager; private final String username; + private final OutputCommitter committer; private final Map<JobACL, AccessControlList> jobACLs; private final Set<TaskId> completedTasksFromPreviousRun; private final List<AMInfo> amInfos; @@ -142,6 +133,7 @@ public class JobImpl implements org.apac private final Lock writeLock; private final JobId jobId; private final String jobName; + private final boolean newApiCommitter; private final org.apache.hadoop.mapreduce.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; private final Object tasksSyncHandle = new Object(); @@ -167,7 +159,6 @@ public class JobImpl implements org.apac private Path remoteJobSubmitDir; public Path remoteJobConfFile; private JobContext jobContext; - private OutputCommitter committer; private int allowedMapFailuresPercent = 0; private int allowedReduceFailuresPercent = 0; private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents; @@ -367,14 +358,16 @@ public class JobImpl implements org.apac private Token<JobTokenIdentifier> jobToken; private JobTokenSecretManager jobTokenSecretManager; - public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf, - EventHandler eventHandler, TaskAttemptListener taskAttemptListener, + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, + Configuration conf, EventHandler eventHandler, + TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, - Credentials fsTokenCredentials, Clock clock, + Credentials fsTokenCredentials, Clock clock, Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics, - String userName, long appSubmitTime, List<AMInfo> amInfos) { + OutputCommitter committer, boolean newApiCommitter, String userName, + long appSubmitTime, List<AMInfo> amInfos) { this.applicationAttemptId = applicationAttemptId; - this.jobId = recordFactory.newRecordInstance(JobId.class); + this.jobId = jobId; this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>"); this.conf = conf; this.metrics = metrics; @@ -383,15 +376,9 @@ public class JobImpl implements org.apac this.amInfos = amInfos; this.userName = userName; this.appSubmitTime = appSubmitTime; - ApplicationId applicationId = applicationAttemptId.getApplicationId(); - jobId.setAppId(applicationId); - jobId.setId(applicationId.getId()); - oldJobId = TypeConverter.fromYarn(jobId); - LOG.info("Job created" + - " appId=" + applicationId + - " jobId=" + jobId + - " oldJobId=" + oldJobId); - + this.oldJobId = TypeConverter.fromYarn(jobId); + this.newApiCommitter = newApiCommitter; + this.taskAttemptListener = taskAttemptListener; this.eventHandler = eventHandler; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -400,6 +387,7 @@ public class JobImpl implements org.apac this.fsTokens = fsTokenCredentials; this.jobTokenSecretManager = jobTokenSecretManager; + this.committer = committer; this.aclsManager = new JobACLsManager(conf); this.username = System.getProperty("user.name"); @@ -854,47 +842,13 @@ public class JobImpl implements org.apac checkTaskLimits(); - - boolean newApiCommitter = false; - if ((job.numReduceTasks > 0 && - job.conf.getBoolean("mapred.reducer.new-api", false)) || - (job.numReduceTasks == 0 && - job.conf.getBoolean("mapred.mapper.new-api", false))) { - newApiCommitter = true; - LOG.info("Using mapred newApiCommitter."); - } - - LOG.info("OutputCommitter set in config " + job.conf.get( - "mapred.output.committer.class")); - - if (newApiCommitter) { + if (job.newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID - = RecordFactoryProvider.getRecordFactory(null) - .newRecordInstance( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class); - attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null) - .newRecordInstance(TaskId.class)); - attemptID.getTaskId().setJobId(job.jobId); - attemptID.getTaskId().setTaskType(TaskType.MAP); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf, - TypeConverter.fromYarn(attemptID)); - try { - OutputFormat outputFormat = ReflectionUtils.newInstance( - taskContext.getOutputFormatClass(), job.conf); - job.committer = outputFormat.getOutputCommitter(taskContext); - } catch(Exception e) { - throw new IOException("Failed to assign outputcommitter", e); - } } else { job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( new JobConf(job.conf), job.oldJobId); - job.committer = ReflectionUtils.newInstance( - job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), job.conf); } - LOG.info("OutputCommitter is " + job.committer.getClass().getName()); long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Oct 24 08:41:48 2011 @@ -32,17 +32,23 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; @@ -53,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; @@ -84,9 +91,6 @@ import org.apache.hadoop.yarn.service.Se //TODO: //task cleanup for all non completed tasks -//change job output committer to have -// - atomic job output promotion -// - recover output of completed tasks public class RecoveryService extends CompositeService implements Recovery { @@ -95,6 +99,7 @@ public class RecoveryService extends Com private static final Log LOG = LogFactory.getLog(RecoveryService.class); private final ApplicationAttemptId applicationAttemptId; + private final OutputCommitter committer; private final Dispatcher dispatcher; private final ControlledClock clock; @@ -108,9 +113,10 @@ public class RecoveryService extends Com private volatile boolean recoveryMode = false; public RecoveryService(ApplicationAttemptId applicationAttemptId, - Clock clock) { + Clock clock, OutputCommitter committer) { super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; + this.committer = committer; this.dispatcher = new RecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); @@ -122,17 +128,17 @@ public class RecoveryService extends Com // parse the history file try { parse(); - if (completedTasks.size() > 0) { - recoveryMode = true; - LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + - "TO RECOVER " + completedTasks.size()); - LOG.info("Job launch time " + jobInfo.getLaunchTime()); - clock.setTime(jobInfo.getLaunchTime()); - } - } catch (IOException e) { + } catch (Exception e) { LOG.warn(e); LOG.warn("Could not parse the old history file. Aborting recovery. " - + "Starting afresh."); + + "Starting afresh.", e); + } + if (completedTasks.size() > 0) { + recoveryMode = true; + LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + + "TO RECOVER " + completedTasks.size()); + LOG.info("Job launch time " + jobInfo.getLaunchTime()); + clock.setTime(jobInfo.getLaunchTime()); } } @@ -315,6 +321,20 @@ public class RecoveryService extends Com TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus()); switch (state) { case SUCCEEDED: + //recover the task output + TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(), + attInfo.getAttemptId()); + try { + committer.recoverTask(taskContext); + } catch (IOException e) { + actualHandler.handle(new JobDiagnosticsUpdateEvent( + aId.getTaskId().getJobId(), "Error in recovering task output " + + e.getMessage())); + actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(), + JobEventType.INTERNAL_ERROR)); + } + LOG.info("Recovered output from task attempt " + attInfo.getAttemptId()); + // send the done event LOG.info("Sending done event to " + aId); actualHandler.handle(new TaskAttemptEvent(aId, @@ -334,6 +354,16 @@ public class RecoveryService extends Com return; } + else if (event.getType() == + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) { + TaskAttemptId aId = ((ContainerLauncherEvent) event) + .getTaskAttemptID(); + actualHandler.handle( + new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + return; + } + // delegate to the actual handler actualHandler.handle(event); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Oct 24 08:41:48 2011 @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -264,9 +265,11 @@ public class MRApp extends MRAppMaster { } catch (IOException e) { throw new YarnException(e); } - Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(), - getTaskAttemptListener(), getContext().getClock(), - currentUser.getUserName()); + Job newJob = new TestJob(getJobId(), getAttemptID(), conf, + getDispatcher().getEventHandler(), + getTaskAttemptListener(), getContext().getClock(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName()); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); getDispatcher().register(JobFinishEvent.Type.class, @@ -413,13 +416,15 @@ public class MRApp extends MRAppMaster { return localStateMachine; } - public TestJob(Configuration conf, ApplicationId applicationId, - EventHandler eventHandler, TaskAttemptListener taskAttemptListener, - Clock clock, String user) { - super(getApplicationAttemptId(applicationId, getStartCount()), conf, - eventHandler, taskAttemptListener, new JobTokenSecretManager(), - new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics, - user, System.currentTimeMillis(), getAllAMInfos()); + public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, + Configuration conf, EventHandler eventHandler, + TaskAttemptListener taskAttemptListener, Clock clock, + OutputCommitter committer, boolean newApiCommitter, String user) { + super(jobId, getApplicationAttemptId(applicationId, getStartCount()), + conf, eventHandler, taskAttemptListener, + new JobTokenSecretManager(), new Credentials(), clock, + getCompletedTaskFromPreviousRun(), metrics, committer, + newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos()); // This "this leak" is okay because the retained pointer is in an // instance variable. Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Oct 24 08:41:48 2011 @@ -342,10 +342,10 @@ public class TestRMContainerAllocator { public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, int numMaps, int numReduces) { - super(appAttemptID, conf, null, null, null, null, null, null, null, null, - System.currentTimeMillis(), null); - this.jobId = MRBuilderUtils - .newJobId(appAttemptID.getApplicationId(), 0); + super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0), + appAttemptID, conf, null, null, null, null, null, null, null, null, + true, null, System.currentTimeMillis(), null); + this.jobId = getID(); this.numMaps = numMaps; this.numReduces = numReduces; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Mon Oct 24 08:41:48 2011 @@ -18,6 +18,9 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.util.Iterator; import junit.framework.Assert; @@ -25,10 +28,21 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; @@ -37,20 +51,34 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; public class TestRecovery { private static final Log LOG = LogFactory.getLog(TestRecovery.class); + private static Path outputDir = new Path(new File("target", + TestRecovery.class.getName()).getAbsolutePath() + + Path.SEPARATOR + "out"); + private static String partFile = "part-r-00000"; + private Text key1 = new Text("key1"); + private Text key2 = new Text("key2"); + private Text val1 = new Text("val1"); + private Text val2 = new Text("val2"); + @Test public void testCrashed() throws Exception { + int runCount = 0; long am1StartTimeEst = System.currentTimeMillis(); MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount); Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); long jobStartTime = job.getReport().getStartTime(); @@ -135,6 +163,9 @@ public class TestRecovery { app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); job = app.submit(conf); app.waitForState(job, JobState.RUNNING); @@ -201,7 +232,165 @@ public class TestRecovery { // TODO Add verification of additional data from jobHistory - whatever was // available in the failed attempt should be available here } + + @Test + public void testOutputRecovery() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task reduceTask1 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator() + .next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + //send the done signal to the map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(reduceTask1, TaskState.RUNNING); + TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); + + // write output corresponding to reduce1 + writeOutput(reduce1Attempt1, conf); + + //send the done signal to the 1st reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first reduce task to complete + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + //stop the app before the job completes. + app.stop(); + + //rerun + //in rerun the map will be recovered from previous run + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + reduceTask1 = it.next(); + Task reduceTask2 = it.next(); + + // map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // first reduce will be recovered, no need to send done + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + app.waitForState(reduceTask2, TaskState.RUNNING); + + TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values() + .iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd reduce task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(reduceTask2, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + validateOutput(); + } + + private void writeOutput(TaskAttempt attempt, Configuration conf) + throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attempt.getID())); + + TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat + .getRecordWriter(tContext); + + NullWritable nullWritable = NullWritable.get(); + try { + theRecordWriter.write(key1, val1); + theRecordWriter.write(null, nullWritable); + theRecordWriter.write(null, val1); + theRecordWriter.write(nullWritable, val2); + theRecordWriter.write(key2, nullWritable); + theRecordWriter.write(key1, null); + theRecordWriter.write(null, null); + theRecordWriter.write(key2, val2); + } finally { + theRecordWriter.close(tContext); + } + + OutputFormat outputFormat = ReflectionUtils.newInstance( + tContext.getOutputFormatClass(), conf); + OutputCommitter committer = outputFormat.getOutputCommitter(tContext); + committer.commitTask(tContext); + } + + private void validateOutput() throws IOException { + File expectedFile = new File(new Path(outputDir, partFile).toString()); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(key1).append('\t').append(val1).append("\n"); + expectedOutput.append(val1).append("\n"); + expectedOutput.append(val2).append("\n"); + expectedOutput.append(key2).append("\n"); + expectedOutput.append(key1).append("\n"); + expectedOutput.append(key2).append('\t').append(val2).append("\n"); + String output = slurp(expectedFile); + Assert.assertEquals(output, expectedOutput.toString()); + } + + public static String slurp(File f) throws IOException { + int len = (int) f.length(); + byte[] buf = new byte[len]; + FileInputStream in = new FileInputStream(f); + String contents = null; + try { + in.read(buf, 0, len); + contents = new String(buf, "UTF-8"); + } finally { + in.close(); + } + return contents; + } + + class MRAppWithHistory extends MRApp { public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Oct 24 08:41:48 2011 @@ -449,6 +449,8 @@ public interface MRJobConfig { public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; + public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV"; + // This should be the directory where splits file gets localized on the node // running ApplicationMaster. public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Mon Oct 24 08:41:48 2011 @@ -25,7 +25,6 @@ import org.apache.avro.Schema; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; import org.apache.commons.logging.Log; @@ -72,6 +71,7 @@ class EventWriter { void flush() throws IOException { encoder.flush(); out.flush(); + out.hflush(); } void close() throws IOException { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=1188043&r1=1188042&r2=1188043&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Mon Oct 24 08:41:48 2011 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.lib.output; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobContext; @@ -56,6 +58,17 @@ public class NullOutputFormat<K, V> exte } public void setupJob(JobContext jobContext) { } public void setupTask(TaskAttemptContext taskContext) { } + + @Override + public boolean isRecoverySupported() { + return true; + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) + throws IOException { + // Nothing to do for recovering the task. + } }; } }