Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Tue Aug 12 17:02:07 2014 @@ -18,14 +18,10 @@ package org.apache.hadoop.yarn.server.timeline.webapp; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.EntityIdentifier; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -80,14 +73,11 @@ public class TimelineWebServices { private static final Log LOG = LogFactory.getLog(TimelineWebServices.class); - private TimelineStore store; - private TimelineACLsManager timelineACLsManager; + private TimelineDataManager timelineDataManager; @Inject - public TimelineWebServices(TimelineStore store, - TimelineACLsManager timelineACLsManager) { - this.store = store; - this.timelineACLsManager = timelineACLsManager; + public TimelineWebServices(TimelineDataManager timelineDataManager) { + this.timelineDataManager = timelineDataManager; } @XmlRootElement(name = "about") @@ -148,61 +138,28 @@ public class TimelineWebServices { @QueryParam("limit") String limit, @QueryParam("fields") String fields) { init(res); - TimelineEntities entities = null; try { - EnumSet<Field> fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - UserGroupInformation callerUGI = getUser(req); - entities = store.getEntities( + return timelineDataManager.getEntities( parseStr(entityType), - parseLongStr(limit), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), parseLongStr(windowStart), parseLongStr(windowEnd), parseStr(fromId), parseLongStr(fromTs), - parsePairStr(primaryFilter, ":"), - parsePairsStr(secondaryFilter, ",", ":"), - fieldEnums); - if (entities != null) { - Iterator<TimelineEntity> entitiesItr = - entities.getEntities().iterator(); - while (entitiesItr.hasNext()) { - TimelineEntity entity = entitiesItr.next(); - try { - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entitiesItr.remove(); - } else { - // clean up system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } catch (YarnException e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(entity.getEntityId(), - entity.getEntityType()), e); - entitiesItr.remove(); - } - } - } + parseLongStr(limit), + parseFieldsStr(fields, ","), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); } catch (IllegalArgumentException e) { throw new BadRequestException("requested invalid field."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (entities == null) { - return new TimelineEntities(); - } - return entities; } /** @@ -220,33 +177,15 @@ public class TimelineWebServices { init(res); TimelineEntity entity = null; try { - EnumSet<Field> fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - entity = - store.getEntity(parseStr(entityId), parseStr(entityType), - fieldEnums); - if (entity != null) { - // check ACLs - UserGroupInformation callerUGI = getUser(req); - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entity = null; - } else { - // clean up the system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } + entity = timelineDataManager.getEntity( + parseStr(entityType), + parseStr(entityId), + parseFieldsStr(fields, ","), + getUser(req)); } catch (IllegalArgumentException e) { throw new BadRequestException( "requested invalid field."); - } catch (IOException e) { - LOG.error("Error getting entity", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } catch (YarnException e) { + } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -275,51 +214,23 @@ public class TimelineWebServices { @QueryParam("windowEnd") String windowEnd, @QueryParam("limit") String limit) { init(res); - TimelineEvents events = null; try { - UserGroupInformation callerUGI = getUser(req); - events = store.getEntityTimelines( + return timelineDataManager.getEvents( parseStr(entityType), parseArrayStr(entityId, ","), - parseLongStr(limit), + parseArrayStr(eventType, ","), parseLongStr(windowStart), parseLongStr(windowEnd), - parseArrayStr(eventType, ",")); - if (events != null) { - Iterator<TimelineEvents.EventsOfOneEntity> eventsItr = - events.getAllEvents().iterator(); - while (eventsItr.hasNext()) { - TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); - try { - TimelineEntity entity = store.getEntity( - eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - eventsItr.remove(); - } - } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType()), e); - eventsItr.remove(); - } - } - } + parseLongStr(limit), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (events == null) { - return new TimelineEvents(); - } - return events; } /** @@ -333,9 +244,6 @@ public class TimelineWebServices { @Context HttpServletResponse res, TimelineEntities entities) { init(res); - if (entities == null) { - return new TimelinePutResponse(); - } UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; @@ -343,76 +251,8 @@ public class TimelineWebServices { throw new ForbiddenException(msg); } try { - List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>(); - TimelineEntities entitiesToPut = new TimelineEntities(); - List<TimelinePutResponse.TimelinePutError> errors = - new ArrayList<TimelinePutResponse.TimelinePutError>(); - for (TimelineEntity entity : entities.getEntities()) { - EntityIdentifier entityID = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); - - // check if there is existing entity - TimelineEntity existingEntity = null; - try { - existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - if (existingEntity != null - && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) { - throw new YarnException("The timeline entity " + entityID - + " was not put by " + callerUGI + " before"); - } - } catch (Exception e) { - // Skip the entity which already exists and was put by others - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.ACCESS_DENIED); - errors.add(error); - continue; - } - - // inject owner information for the access check if this is the first - // time to post the entity, in case it's the admin who is updating - // the timeline data. - try { - if (existingEntity == null) { - injectOwnerInfo(entity, callerUGI.getShortUserName()); - } - } catch (YarnException e) { - // Skip the entity which messes up the primary filter and record the - // error - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT); - errors.add(error); - continue; - } - - entityIDs.add(entityID); - entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); - } - TimelinePutResponse response = store.put(entitiesToPut); - // add the errors of timeline system filter key conflict - response.addErrors(errors); - return response; - } catch (IOException e) { + return timelineDataManager.postEntities(entities, callerUGI); + } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -423,6 +263,15 @@ public class TimelineWebServices { response.setContentType(null); } + private static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + private static SortedSet<String> parseArrayStr(String str, String delimiter) { if (str == null) { return null; @@ -495,14 +344,6 @@ public class TimelineWebServices { } } - private static boolean extendFields(EnumSet<Field> fieldEnums) { - boolean modified = false; - if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) { - fieldEnums.add(Field.PRIMARY_FILTERS); - modified = true; - } - return modified; - } private static Long parseLongStr(String str) { return str == null ? null : Long.parseLong(str.trim()); } @@ -511,34 +352,4 @@ public class TimelineWebServices { return str == null ? null : str.trim(); } - private static UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUGI; - } - - private static void injectOwnerInfo(TimelineEntity timelineEntity, - String owner) throws YarnException { - if (timelineEntity.getPrimaryFilters() != null && - timelineEntity.getPrimaryFilters().containsKey( - TimelineStore.SystemFilter.ENTITY_OWNER.toString())) { - throw new YarnException( - "User should not use the timeline system filter key: " - + TimelineStore.SystemFilter.ENTITY_OWNER); - } - timelineEntity.addPrimaryFilter( - TimelineStore.SystemFilter.ENTITY_OWNER - .toString(), owner); - } - - private static void cleanupOwnerInfo(TimelineEntity timelineEntity) { - if (timelineEntity.getPrimaryFilters() != null) { - timelineEntity.getPrimaryFilters().remove( - TimelineStore.SystemFilter.ENTITY_OWNER.toString()); - } - } - }
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Tue Aug 12 17:02:07 2014 @@ -69,7 +69,7 @@ public class TestApplicationHistoryClien historyServer.init(config); historyServer.start(); store = - ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory()) + ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager()) .getHistoryStore(); } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Tue Aug 12 17:02:07 2014 @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AdminACLsManager; import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter; @@ -89,14 +90,15 @@ public class TestTimelineWebServices ext } catch (Exception e) { Assert.fail(); } - bind(TimelineStore.class).toInstance(store); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false); timelineACLsManager = new TimelineACLsManager(conf); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); adminACLsManager = new AdminACLsManager(conf); - bind(TimelineACLsManager.class).toInstance(timelineACLsManager); + TimelineDataManager timelineDataManager = + new TimelineDataManager(store, timelineACLsManager); + bind(TimelineDataManager.class).toInstance(timelineDataManager); serve("/*").with(GuiceContainer.class); TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter(); FilterConfig filterConfig = mock(FilterConfig.class); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Tue Aug 12 17:02:07 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -29,17 +30,18 @@ import java.util.concurrent.locks.Reentr import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -126,9 +128,76 @@ public abstract class ContainerExecutor public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) throws IOException, InterruptedException; + public abstract boolean isContainerProcessAlive(String user, String pid) + throws IOException; + + /** + * Recover an already existing container. This is a blocking call and returns + * only when the container exits. Note that the container must have been + * activated prior to this call. + * @param user the user of the container + * @param containerId The ID of the container to reacquire + * @return The exit code of the pre-existing container + * @throws IOException + */ + public int reacquireContainer(String user, ContainerId containerId) + throws IOException { + Path pidPath = getPidFilePath(containerId); + if (pidPath == null) { + LOG.warn(containerId + " is not active, returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + + String pid = null; + pid = ProcessIdFileReader.getProcessId(pidPath); + if (pid == null) { + throw new IOException("Unable to determine pid for " + containerId); + } + + LOG.info("Reacquiring " + containerId + " with pid " + pid); + try { + while(isContainerProcessAlive(user, pid)) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted while waiting for process " + pid + + " to exit", e); + } + + // wait for exit code file to appear + String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); + File file = new File(exitCodeFile); + final int sleepMsec = 100; + int msecLeft = 2000; + while (!file.exists() && msecLeft >= 0) { + if (!isContainerActive(containerId)) { + LOG.info(containerId + " was deactivated"); + return ExitCode.TERMINATED.getExitCode(); + } + try { + Thread.sleep(sleepMsec); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted while waiting for exit code from " + containerId, e); + } + msecLeft -= sleepMsec; + } + if (msecLeft < 0) { + throw new IOException("Timeout while waiting for exit code from " + + containerId); + } + + try { + return Integer.parseInt(FileUtils.readFileToString(file).trim()); + } catch (NumberFormatException e) { + throw new IOException("Error parsing exit code from pid " + pid, e); + } + } + public enum ExitCode { FORCE_KILLED(137), - TERMINATED(143); + TERMINATED(143), + LOST(154); private final int code; private ExitCode(int exitCode) { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue Aug 12 17:02:07 2014 @@ -273,25 +273,57 @@ public class DefaultContainerExecutor ex private final class UnixLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { + private final Path sessionScriptPath; public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { super(containerWorkDir); + this.sessionScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension("default_container_executor_session")); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { + writeSessionScript(launchDst, pidFile); + super.writeLocalWrapperScript(launchDst, pidFile); } @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { - - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv + String exitCodeFile = ContainerLaunch.getExitCodeFile( + pidFile.toString()); + String tmpFile = exitCodeFile + ".tmp"; pout.println("#!/bin/bash"); - pout.println(); - pout.println("echo $$ > " + pidFile.toString() + ".tmp"); - pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); - String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; - pout.println(exec + " /bin/bash \"" + - launchDst.toUri().getPath().toString() + "\""); + pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\""); + pout.println("rc=$?"); + pout.println("echo $rc > \"" + tmpFile + "\""); + pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); + pout.println("exit $rc"); + } + + private void writeSessionScript(Path launchDst, Path pidFile) + throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/bin/bash"); + pout.println(); + pout.println("echo $$ > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; + pout.println(exec + " /bin/bash \"" + + launchDst.toUri().getPath().toString() + "\""); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } } @@ -310,6 +342,7 @@ public class DefaultContainerExecutor ex @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { + // TODO: exit code script for Windows // On Windows, the pid is the container ID, so that it can also serve as // the name of the job object created by winutils for task management. @@ -342,6 +375,12 @@ public class DefaultContainerExecutor ex return true; } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + return containerIsAlive(pid); + } + /** * Returns true if the process with the specified pid is alive. * Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Aug 12 17:02:07 2014 @@ -403,6 +403,13 @@ public class LinuxContainerExecutor exte } } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + // Send a test signal to the process as the user to see if it's alive + return signalContainer(user, pid, Signal.NULL); + } + public void mountCgroups(List<String> cgroupKVs, String hierarchy) throws IOException { List<String> command = new ArrayList<String>( Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Aug 12 17:02:07 2014 @@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record public interface NodeStatusUpdater extends Service { + /** + * Schedule a heartbeat to the ResourceManager outside of the normal, + * periodic heartbeating process. This is typically called when the state + * of containers on the node has changed to notify the RM sooner. + */ void sendOutofBandHeartBeat(); + /** + * Get the ResourceManager identifier received during registration + * @return the ResourceManager ID + */ long getRMIdentifier(); + /** + * Query if a container has recently completed + * @param containerId the container ID + * @return true if the container has recently completed + */ public boolean isContainerRecentlyStopped(ContainerId containerId); + /** + * Add a container to the list of containers that have recently completed + * @param containerId the ID of the completed container + */ + public void addCompletedContainer(ContainerId containerId); + + /** + * Clear the list of recently completed containers + */ public void clearFinishedContainersFromCache(); } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Aug 12 17:02:07 2014 @@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } if (LOG.isDebugEnabled()) { @@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl exten // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } LOG.info("Sending out " + containerStatuses.size() @@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl exten return containerStatuses; } - private void addCompletedContainer(Container container) { + @Override + public void addCompletedContainer(ContainerId containerId) { synchronized (previousCompletedContainers) { - previousCompletedContainers.add(container.getContainerId()); + previousCompletedContainers.add(containerId); + } + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); } } @@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl exten } } - @Private - @VisibleForTesting - public void updateStoppedContainersInCache(ContainerId containerId) { - synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); - } - } - @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl exten Iterator<ContainerId> i = recentlyStoppedContainers.keySet().iterator(); while (i.hasNext()) { - if (recentlyStoppedContainers.get(i.next()) < currentTime) { + ContainerId cid = i.next(); + if (recentlyStoppedContainers.get(cid) < currentTime) { i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container " + cid + " in store", e); + } } else { break; } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Aug 12 17:02:07 2014 @@ -127,6 +127,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -246,6 +248,10 @@ public class ContainerManagerImpl extend recoverApplication(proto); } + for (RecoveredContainerState rcs : stateStore.loadContainersState()) { + recoverContainer(rcs); + } + String diagnostic = "Application marked finished during recovery"; for (ApplicationId appId : appsState.getFinishedApplications()) { dispatcher.getEventHandler().handle( @@ -276,6 +282,60 @@ public class ContainerManagerImpl extend app.handle(new ApplicationInitEvent(appId, acls)); } + @SuppressWarnings("unchecked") + private void recoverContainer(RecoveredContainerState rcs) + throws IOException { + StartContainerRequest req = rcs.getStartRequest(); + ContainerLaunchContext launchContext = req.getContainerLaunchContext(); + ContainerTokenIdentifier token = + BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + ContainerId containerId = token.getContainerID(); + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Recovering " + containerId + " in state " + rcs.getStatus() + + " with exit code " + rcs.getExitCode()); + + if (context.getApplications().containsKey(appId)) { + Credentials credentials = parseCredentials(launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + context.getNMStateStore(), req.getContainerLaunchContext(), + credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), + rcs.getDiagnostics(), rcs.getKilled()); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); + } else { + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { + LOG.warn(containerId + " has no corresponding application!"); + } + LOG.info("Adding " + containerId + " to recently stopped containers"); + nodeStatusUpdater.addCompletedContainer(containerId); + } + } + + private void waitForRecoveredContainers() throws InterruptedException { + final int sleepMsec = 100; + int waitIterations = 100; + List<ContainerId> newContainers = new ArrayList<ContainerId>(); + while (--waitIterations >= 0) { + newContainers.clear(); + for (Container container : context.getContainers().values()) { + if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) { + newContainers.add(container.getContainerId()); + } + } + if (newContainers.isEmpty()) { + break; + } + LOG.info("Waiting for containers: " + newContainers); + Thread.sleep(sleepMsec); + } + if (waitIterations < 0) { + LOG.warn("Timeout waiting for recovered containers"); + } + } + protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -309,6 +369,23 @@ public class ContainerManagerImpl extend // Enqueue user dirs in deletion context Configuration conf = getConfig(); + final InetSocketAddress initialAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_PORT); + boolean usingEphemeralPort = (initialAddress.getPort() == 0); + if (context.getNMStateStore().canRecover() && usingEphemeralPort) { + throw new IllegalArgumentException("Cannot support recovery with an " + + "ephemeral server port. Check the setting of " + + YarnConfiguration.NM_ADDRESS); + } + // If recovering then delay opening the RPC service until the recovery + // of resources and containers have completed, otherwise requests from + // clients during recovery can interfere with the recovery process. + final boolean delayedRpcServerStart = + context.getNMStateStore().canRecover(); + Configuration serverConf = new Configuration(conf); // always enforce it to be token-based. @@ -318,12 +395,6 @@ public class ContainerManagerImpl extend YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress initialAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_PORT); - server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(), @@ -340,32 +411,61 @@ public class ContainerManagerImpl extend LOG.info("Blocking new container-requests as container manager rpc" + " server is still starting."); this.setBlockNewContainerRequests(true); - server.start(); - - InetSocketAddress connectAddress; + String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST); String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS); - if (bindHost == null || bindHost.isEmpty() || - nmAddress == null || nmAddress.isEmpty()) { - connectAddress = NetUtils.getConnectAddress(server); - } else { - //a bind-host case with an address, to support overriding the first hostname - //found when querying for our hostname with the specified address, combine - //the specified address with the actual port listened on by the server - connectAddress = NetUtils.getConnectAddress( - new InetSocketAddress(nmAddress.split(":")[0], - server.getListenerAddress().getPort())); + String hostOverride = null; + if (bindHost != null && !bindHost.isEmpty() + && nmAddress != null && !nmAddress.isEmpty()) { + //a bind-host case with an address, to support overriding the first + //hostname found when querying for our hostname with the specified + //address, combine the specified address with the actual port listened + //on by the server + hostOverride = nmAddress.split(":")[0]; } - NodeId nodeId = NodeId.newInstance( - connectAddress.getAddress().getCanonicalHostName(), - connectAddress.getPort()); + // setup node ID + InetSocketAddress connectAddress; + if (delayedRpcServerStart) { + connectAddress = NetUtils.getConnectAddress(initialAddress); + } else { + server.start(); + connectAddress = NetUtils.getConnectAddress(server); + } + NodeId nodeId = buildNodeId(connectAddress, hostOverride); ((NodeManager.NMContext)context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); + + // start remaining services + super.serviceStart(); + + if (delayedRpcServerStart) { + waitForRecoveredContainers(); + server.start(); + + // check that the node ID is as previously advertised + connectAddress = NetUtils.getConnectAddress(server); + NodeId serverNode = buildNodeId(connectAddress, hostOverride); + if (!serverNode.equals(nodeId)) { + throw new IOException("Node mismatch after server started, expected '" + + nodeId + "' but found '" + serverNode + "'"); + } + } + LOG.info("ContainerManager started at " + connectAddress); LOG.info("ContainerManager bound to " + initialAddress); - super.serviceStart(); + } + + private NodeId buildNodeId(InetSocketAddress connectAddress, + String hostOverride) { + if (hostOverride != null) { + connectAddress = NetUtils.getConnectAddress( + new InetSocketAddress(hostOverride, connectAddress.getPort())); + } + return NodeId.newInstance( + connectAddress.getAddress().getCanonicalHostName(), + connectAddress.getPort()); } void refreshServiceAcls(Configuration configuration, @@ -704,7 +804,8 @@ public class ContainerManagerImpl extend Credentials credentials = parseCredentials(launchContext); Container container = - new ContainerImpl(getConfig(), this.dispatcher, launchContext, + new ContainerImpl(getConfig(), this.dispatcher, + context.getNMStateStore(), launchContext, credentials, metrics, containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); @@ -733,6 +834,7 @@ public class ContainerManagerImpl extend new ApplicationInitEvent(applicationID, appAcls)); } + this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -780,7 +882,7 @@ public class ContainerManagerImpl extend } private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws YarnException { + throws IOException { Credentials credentials = new Credentials(); // //////////// Parse credentials ByteBuffer tokens = launchContext.getTokens(); @@ -789,15 +891,11 @@ public class ContainerManagerImpl extend DataInputByteBuffer buf = new DataInputByteBuffer(); tokens.rewind(); buf.reset(tokens); - try { - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); } - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); } } // //////////// End of parsing credentials @@ -830,7 +928,7 @@ public class ContainerManagerImpl extend @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID) throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); @@ -843,6 +941,7 @@ public class ContainerManagerImpl extend + " is not handled by this NodeManager"); } } else { + context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, ContainerExitStatus.KILLED_BY_APPMASTER, Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Aug 12 17:02:07 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -75,6 +78,7 @@ public class ContainerImpl implements Co private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; + private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; private final ContainerLaunchContext launchContext; @@ -101,12 +105,19 @@ public class ContainerImpl implements Co private final List<LocalResourceRequest> appRsrcs = new ArrayList<LocalResourceRequest>(); + // whether container has been recovered after a restart + private RecoveredContainerStatus recoveredStatus = + RecoveredContainerStatus.REQUESTED; + // whether container was marked as killed after recovery + private boolean recoveredAsKilled = false; + public ContainerImpl(Configuration conf, Dispatcher dispatcher, - ContainerLaunchContext launchContext, Credentials creds, - NodeManagerMetrics metrics, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier) { this.daemonConf = conf; this.dispatcher = dispatcher; + this.stateStore = stateStore; this.launchContext = launchContext; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); @@ -122,6 +133,21 @@ public class ContainerImpl implements Co stateMachine = stateMachineFactory.make(this); } + // constructor for a recovered container + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, + RecoveredContainerStatus recoveredStatus, int exitCode, + String diagnostics, boolean wasKilled) { + this(conf, dispatcher, stateStore, launchContext, creds, metrics, + containerTokenIdentifier); + this.recoveredStatus = recoveredStatus; + this.exitCode = exitCode; + this.recoveredAsKilled = wasKilled; + this.diagnostics.append(diagnostics); + } + private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = new ContainerDoneTransition(); @@ -135,8 +161,10 @@ public class ContainerImpl implements Co new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW) // From NEW State .addTransition(ContainerState.NEW, - EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED, - ContainerState.LOCALIZATION_FAILED), + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.LOCALIZED, + ContainerState.LOCALIZATION_FAILED, + ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, @@ -281,7 +309,9 @@ public class ContainerImpl implements Co UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -295,7 +325,9 @@ public class ContainerImpl implements Co // we notify container of failed localization if localizer thread (for // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.RESOURCE_FAILED) + EnumSet.of(ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // create the topology tables .installTopology(); @@ -420,7 +452,7 @@ public class ContainerImpl implements Co } } - @SuppressWarnings({"fallthrough", "unchecked"}) + @SuppressWarnings("fallthrough") private void finished() { ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); @@ -458,7 +490,11 @@ public class ContainerImpl implements Co } metrics.releaseContainer(this.resource); + sendFinishedEvents(); + } + @SuppressWarnings("unchecked") + private void sendFinishedEvents() { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); @@ -471,6 +507,45 @@ public class ContainerImpl implements Co } @SuppressWarnings("unchecked") // dispatcher not typed + private void sendLaunchEvent() { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.LAUNCH_CONTAINER; + if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { + // try to recover a container that was previously launched + launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); + } + + // Inform the ContainersMonitor to start monitoring the container's + // resource usage. + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendContainerMonitorStartEvent() { + long pmemBytes = getResource().getMemory() * 1024 * 1024L; + float pmemRatio = daemonConf.getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + + dispatcher.getEventHandler().handle( + new ContainerStartMonitoringEvent(containerId, + vmemBytes, pmemBytes)); + } + + private void addDiagnostics(String... diags) { + for (String s : diags) { + this.diagnostics.append(s); + } + try { + stateStore.storeContainerDiagnostics(containerId, diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update diagnostics in state store for " + + containerId, e); + } + } + + @SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc = new HashMap<LocalResourceVisibility, @@ -518,6 +593,16 @@ public class ContainerImpl implements Co @Override public ContainerState transition(ContainerImpl container, ContainerEvent event) { + if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { + container.sendFinishedEvents(); + return ContainerState.DONE; + } else if (container.recoveredAsKilled && + container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { + // container was killed but never launched + container.finished(); + return ContainerState.DONE; + } + final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); @@ -593,9 +678,7 @@ public class ContainerImpl implements Co new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -606,7 +689,6 @@ public class ContainerImpl implements Co * Transition when one of the requested resources for this container * has been successfully localized. */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedTransition implements MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> { @Override @@ -626,9 +708,8 @@ public class ContainerImpl implements Co if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -638,24 +719,22 @@ public class ContainerImpl implements Co * Transition from LOCALIZED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LaunchTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { - // Inform the ContainersMonitor to start monitoring the container's - // resource usage. - long pmemBytes = - container.getResource().getMemory() * 1024 * 1024L; - float pmemRatio = container.daemonConf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - long vmemBytes = (long) (pmemRatio * pmemBytes); - - container.dispatcher.getEventHandler().handle( - new ContainerStartMonitoringEvent(container.containerId, - vmemBytes, pmemBytes)); + container.sendContainerMonitorStartEvent(); container.metrics.runningContainer(); container.wasLaunched = true; + + if (container.recoveredAsKilled) { + LOG.info("Killing " + container.containerId + + " due to recovered as killed"); + container.addDiagnostics("Container recovered as killed.\n"); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } } } @@ -707,8 +786,7 @@ public class ContainerImpl implements Co ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // TODO: Add containerWorkDir to the deletion service. @@ -735,7 +813,7 @@ public class ContainerImpl implements Co @Override public void transition(ContainerImpl container, ContainerEvent event) { super.transition(container, event); - container.diagnostics.append("Killed by external signal\n"); + container.addDiagnostics("Killed by external signal\n"); } } @@ -750,9 +828,7 @@ public class ContainerImpl implements Co ContainerResourceFailedEvent rsrcFailedEvent = (ContainerResourceFailedEvent) event; - container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage() - + "\n"); - + container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n"); // Inform the localizer to decrement reference counts and cleanup // resources. @@ -775,8 +851,8 @@ public class ContainerImpl implements Co container.metrics.endInitingContainer(); ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); } } @@ -817,7 +893,7 @@ public class ContainerImpl implements Co new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER)); ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); container.exitCode = killEvent.getContainerExitStatus(); } } @@ -836,8 +912,7 @@ public class ContainerImpl implements Co } if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // The process/process-grp is killed. Decrement reference counts and @@ -877,8 +952,8 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); super.transition(container, event); } } @@ -892,8 +967,14 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { ContainerDiagnosticsUpdateEvent updateEvent = (ContainerDiagnosticsUpdateEvent) event; - container.diagnostics.append(updateEvent.getDiagnosticsUpdate()) - .append("\n"); + container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n"); + try { + container.stateStore.storeContainerDiagnostics(container.containerId, + container.diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update state store diagnostics for " + + container.containerId, e); + } } } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Aug 12 17:02:07 2014 @@ -87,22 +87,23 @@ public class ContainerLaunch implements public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; private static final String PID_FILE_NAME_FMT = "%s.pid"; + private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; - private final Dispatcher dispatcher; - private final ContainerExecutor exec; + protected final Dispatcher dispatcher; + protected final ContainerExecutor exec; private final Application app; - private final Container container; + protected final Container container; private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); - private volatile AtomicBoolean completed = new AtomicBoolean(false); + protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; - private Path pidFilePath = null; + protected Path pidFilePath = null; private final LocalDirsHandlerService dirsHandler; @@ -223,14 +224,11 @@ public class ContainerLaunch implements + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT, - containerIdStr); + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users - pidFilePath = dirsHandler.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR - + pidFileSuffix); + pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); List<String> localDirs = dirsHandler.getLocalDirs(); List<String> logDirs = dirsHandler.getLogDirs(); @@ -288,6 +286,7 @@ public class ContainerLaunch implements dispatcher.getEventHandler().handle(new ContainerEvent( containerID, ContainerEventType.CONTAINER_LAUNCHED)); + context.getNMStateStore().storeContainerLaunched(containerID); // Check if the container is signalled to be killed. if (!shouldLaunchContainer.compareAndSet(false, true)) { @@ -310,6 +309,11 @@ public class ContainerLaunch implements } finally { completed.set(true); exec.deactivateContainer(containerID); + try { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerID); + } } if (LOG.isDebugEnabled()) { @@ -342,6 +346,11 @@ public class ContainerLaunch implements ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return 0; } + + protected String getPidFileSubpath(String appIdStr, String containerIdStr) { + return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); + } /** * Cleanup the container. @@ -357,6 +366,13 @@ public class ContainerLaunch implements String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } + // launch flag will be set to true if process already launched boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); if (!alreadyLaunched) { @@ -421,6 +437,7 @@ public class ContainerLaunch implements if (pidFilePath != null) { FileContext lfs = FileContext.getLocalFSFileContext(); lfs.delete(pidFilePath, false); + lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); } } } @@ -479,6 +496,10 @@ public class ContainerLaunch implements + appIdStr; } + Context getContext() { + return context; + } + @VisibleForTesting static abstract class ShellScriptBuilder { public static ShellScriptBuilder create() { @@ -787,4 +808,7 @@ public class ContainerLaunch implements } } + public static String getExitCodeFile(String pidFile) { + return pidFile + EXIT_CODE_FILE_SUFFIX; + } } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Aug 12 17:02:07 2014 @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -107,7 +101,6 @@ public class ContainersLauncher extends super.serviceStop(); } - @SuppressWarnings("unchecked") @Override public void handle(ContainersLauncherEvent event) { // TODO: ContainersLauncher launches containers one by one!! @@ -125,6 +118,14 @@ public class ContainersLauncher extends containerLauncher.submit(launch); running.put(containerId, launch); break; + case RECOVER_CONTAINER: + app = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()); + launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, + exec, app, event.getContainer(), dirsHandler, containerManager); + containerLauncher.submit(launch); + running.put(containerId, launch); + break; case CLEANUP_CONTAINER: ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Tue Aug 12 17:02:07 2014 @@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no public enum ContainersLauncherEventType { LAUNCH_CONTAINER, + RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Tue Aug 12 17:02:07 2014 @@ -25,5 +25,7 @@ public interface AppLogAggregator extend void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful); + void abortLogAggregation(); + void finishLogAggregation(); } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue Aug 12 17:02:07 2014 @@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem private final BlockingQueue<ContainerId> pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); + private final AtomicBoolean aborted = new AtomicBoolean(); private final Map<ApplicationAccessType, String> appAcls; private LogWriter writer = null; @@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem private void doAppLogAggregation() { ContainerId containerId; - while (!this.appFinishing.get()) { + while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { wait(THREAD_SLEEP_TIME); @@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem } } + if (this.aborted.get()) { + return; + } + // Application is finished. Finish pending-containers while ((containerId = this.pendingContainers.poll()) != null) { uploadLogsForContainer(containerId); @@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem this.appFinishing.set(true); this.notifyAll(); } + + @Override + public synchronized void abortLogAggregation() { + LOG.info("Aborting log aggregation for " + this.applicationId); + this.aborted.set(true); + this.notifyAll(); + } } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Aug 12 17:02:07 2014 @@ -142,9 +142,17 @@ public class LogAggregationService exten private void stopAggregators() { threadPool.shutdown(); + // if recovery on restart is supported then leave outstanding aggregations + // to the next restart + boolean shouldAbort = context.getNMStateStore().canRecover() + && !context.getDecommissioned(); // politely ask to finish for (AppLogAggregator aggregator : appLogAggregators.values()) { - aggregator.finishLogAggregation(); + if (shouldAbort) { + aggregator.abortLogAggregation(); + } else { + aggregator.finishLogAggregation(); + } } while (!threadPool.isTerminated()) { // wait for all threads to finish for (ApplicationId appId : appLogAggregators.keySet()) {
