Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Tue Aug 19 23:49:39 2014 @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -66,4 +67,10 @@ public interface Context { LocalDirsHandlerService getLocalDirsHandler(); ApplicationACLsManager getApplicationACLsManager(); + + NMStateStoreService getNMStateStore(); + + boolean getDecommissioned(); + + void setDecommissioned(boolean isDecommissioned); }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import com.google.common.base.Optional; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -212,10 +213,21 @@ public class DefaultContainerExecutor ex && exitCode != ExitCode.TERMINATED.getExitCode()) { LOG.warn("Exception from container-launch with container ID: " + containerId + " and exit code: " + exitCode , e); - logOutput(shExec.getOutput()); - String diagnostics = "Exception from container-launch: " - + e + "\n" - + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); + + StringBuilder builder = new StringBuilder(); + builder.append("Exception from container-launch.\n"); + builder.append("Container id: " + containerId + "\n"); + builder.append("Exit code: " + exitCode + "\n"); + if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { + builder.append("Exception message: " + e.getMessage() + "\n"); + } + builder.append("Stack trace: " + + StringUtils.stringifyException(e) + "\n"); + if (!shExec.getOutput().isEmpty()) { + builder.append("Shell output: " + shExec.getOutput() + "\n"); + } + String diagnostics = builder.toString(); + logOutput(diagnostics); container.handle(new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); } else { @@ -261,25 +273,57 @@ public class DefaultContainerExecutor ex private final class UnixLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { + private final Path sessionScriptPath; public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { super(containerWorkDir); + this.sessionScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension("default_container_executor_session")); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { + writeSessionScript(launchDst, pidFile); + super.writeLocalWrapperScript(launchDst, pidFile); } @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { - - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv + String exitCodeFile = ContainerLaunch.getExitCodeFile( + pidFile.toString()); + String tmpFile = exitCodeFile + ".tmp"; pout.println("#!/bin/bash"); - pout.println(); - pout.println("echo $$ > " + pidFile.toString() + ".tmp"); - pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); - String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; - pout.println(exec + " /bin/bash \"" + - launchDst.toUri().getPath().toString() + "\""); + pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\""); + pout.println("rc=$?"); + pout.println("echo $rc > \"" + tmpFile + "\""); + pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); + pout.println("exit $rc"); + } + + private void writeSessionScript(Path launchDst, Path pidFile) + throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/bin/bash"); + pout.println(); + pout.println("echo $$ > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; + pout.println(exec + " /bin/bash \"" + + launchDst.toUri().getPath().toString() + "\""); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } } @@ -298,6 +342,7 @@ public class DefaultContainerExecutor ex @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { + // TODO: exit code script for Windows // On Windows, the pid is the container ID, so that it can also serve as // the name of the job object created by winutils for task management. @@ -330,6 +375,12 @@ public class DefaultContainerExecutor ex return true; } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + return containerIsAlive(pid); + } + /** * Returns true if the process with the specified pid is alive. * Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Aug 19 23:49:39 2014 @@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -50,6 +57,8 @@ public class DeletionService extends Abs private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); + private final NMStateStoreService stateStore; + private AtomicInteger nextTaskId = new AtomicInteger(0); static final FileContext getLfs() { try { @@ -60,14 +69,18 @@ public class DeletionService extends Abs } public DeletionService(ContainerExecutor exec) { + this(exec, new NMNullStateStoreService()); + } + + public DeletionService(ContainerExecutor exec, + NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; this.debugDelay = 0; + this.stateStore = stateStore; } /** - * - /** * Delete the path(s) as this user. * @param user The user to delete as, or the JVM user if null * @param subDir the sub directory name @@ -76,19 +89,20 @@ public class DeletionService extends Abs public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline if (debugDelay != -1) { - if (baseDirs == null || baseDirs.length == 0) { - sched.schedule(new FileDeletionTask(this, user, subDir, null), - debugDelay, TimeUnit.SECONDS); - } else { - sched.schedule( - new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)), - debugDelay, TimeUnit.SECONDS); + List<Path> baseDirList = null; + if (baseDirs != null && baseDirs.length != 0) { + baseDirList = Arrays.asList(baseDirs); } + FileDeletionTask task = + new FileDeletionTask(this, user, subDir, baseDirList); + recordDeletionTaskInStateStore(task); + sched.schedule(task, debugDelay, TimeUnit.SECONDS); } } public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { if (debugDelay != -1) { + recordDeletionTaskInStateStore(fileDeletionTask); sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); } } @@ -109,6 +123,9 @@ public class DeletionService extends Abs } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); + if (stateStore.canRecover()) { + recover(stateStore.loadDeletionServiceState()); + } super.serviceInit(conf); } @@ -139,6 +156,8 @@ public class DeletionService extends Abs } public static class FileDeletionTask implements Runnable { + public static final int INVALID_TASK_ID = -1; + private int taskId; private final String user; private final Path subDir; private final List<Path> baseDirs; @@ -152,6 +171,12 @@ public class DeletionService extends Abs private FileDeletionTask(DeletionService delService, String user, Path subDir, List<Path> baseDirs) { + this(INVALID_TASK_ID, delService, user, subDir, baseDirs); + } + + private FileDeletionTask(int taskId, DeletionService delService, + String user, Path subDir, List<Path> baseDirs) { + this.taskId = taskId; this.delService = delService; this.user = user; this.subDir = subDir; @@ -198,6 +223,12 @@ public class DeletionService extends Abs return this.success; } + public synchronized FileDeletionTask[] getSuccessorTasks() { + FileDeletionTask[] successors = + new FileDeletionTask[successorTaskSet.size()]; + return successorTaskSet.toArray(successors); + } + @Override public void run() { if (LOG.isDebugEnabled()) { @@ -286,6 +317,12 @@ public class DeletionService extends Abs * dependent tasks of it has failed marking its success = false. */ private synchronized void fileDeletionTaskFinished() { + try { + delService.stateStore.removeDeletionTask(taskId); + } catch (IOException e) { + LOG.error("Unable to remove deletion task " + taskId + + " from state store", e); + } Iterator<FileDeletionTask> successorTaskI = this.successorTaskSet.iterator(); while (successorTaskI.hasNext()) { @@ -318,4 +355,129 @@ public class DeletionService extends Abs Path[] baseDirs) { return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); } + + private void recover(RecoveredDeletionServiceState state) + throws IOException { + List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks(); + Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap = + new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size()); + Set<Integer> successorTasks = new HashSet<Integer>(); + for (DeletionServiceDeleteTaskProto proto : taskProtos) { + DeletionTaskRecoveryInfo info = parseTaskProto(proto); + idToInfoMap.put(info.task.taskId, info); + nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId)); + successorTasks.addAll(info.successorTaskIds); + } + + // restore the task dependencies and schedule the deletion tasks that + // have no predecessors + final long now = System.currentTimeMillis(); + for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { + for (Integer successorId : info.successorTaskIds){ + DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); + if (successor != null) { + info.task.addFileDeletionTaskDependency(successor.task); + } else { + LOG.error("Unable to locate dependency task for deletion task " + + info.task.taskId + " at " + info.task.getSubDir()); + } + } + if (!successorTasks.contains(info.task.taskId)) { + long msecTilDeletion = info.deletionTimestamp - now; + sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS); + } + } + } + + private DeletionTaskRecoveryInfo parseTaskProto( + DeletionServiceDeleteTaskProto proto) throws IOException { + int taskId = proto.getId(); + String user = proto.hasUser() ? proto.getUser() : null; + Path subdir = null; + List<Path> basePaths = null; + if (proto.hasSubdir()) { + subdir = new Path(proto.getSubdir()); + } + List<String> basedirs = proto.getBasedirsList(); + if (basedirs != null && basedirs.size() > 0) { + basePaths = new ArrayList<Path>(basedirs.size()); + for (String basedir : basedirs) { + basePaths.add(new Path(basedir)); + } + } + + FileDeletionTask task = new FileDeletionTask(taskId, this, user, + subdir, basePaths); + return new DeletionTaskRecoveryInfo(task, + proto.getSuccessorIdsList(), + proto.getDeletionTime()); + } + + private int generateTaskId() { + // get the next ID but avoid an invalid ID + int taskId = nextTaskId.incrementAndGet(); + while (taskId == FileDeletionTask.INVALID_TASK_ID) { + taskId = nextTaskId.incrementAndGet(); + } + return taskId; + } + + private void recordDeletionTaskInStateStore(FileDeletionTask task) { + if (!stateStore.canRecover()) { + // optimize the case where we aren't really recording + return; + } + if (task.taskId != FileDeletionTask.INVALID_TASK_ID) { + return; // task already recorded + } + + task.taskId = generateTaskId(); + + FileDeletionTask[] successors = task.getSuccessorTasks(); + + // store successors first to ensure task IDs have been generated for them + for (FileDeletionTask successor : successors) { + recordDeletionTaskInStateStore(successor); + } + + DeletionServiceDeleteTaskProto.Builder builder = + DeletionServiceDeleteTaskProto.newBuilder(); + builder.setId(task.taskId); + if (task.getUser() != null) { + builder.setUser(task.getUser()); + } + if (task.getSubDir() != null) { + builder.setSubdir(task.getSubDir().toString()); + } + builder.setDeletionTime(System.currentTimeMillis() + + TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS)); + if (task.getBaseDirs() != null) { + for (Path dir : task.getBaseDirs()) { + builder.addBasedirs(dir.toString()); + } + } + for (FileDeletionTask successor : successors) { + builder.addSuccessorIds(successor.taskId); + } + + try { + stateStore.storeDeletionTask(task.taskId, builder.build()); + } catch (IOException e) { + LOG.error("Unable to store deletion task " + task.taskId + " for " + + task.getSubDir(), e); + } + } + + private static class DeletionTaskRecoveryInfo { + FileDeletionTask task; + List<Integer> successorTaskIds; + long deletionTimestamp; + + public DeletionTaskRecoveryInfo(FileDeletionTask task, + List<Integer> successorTaskIds, long deletionTimestamp) { + this.task = task; + this.successorTaskIds = successorTaskIds; + this.deletionTimestamp = deletionTimestamp; + } + } } \ No newline at end of file Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import com.google.common.base.Optional; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -296,9 +297,21 @@ public class LinuxContainerExecutor exte && exitCode != ExitCode.TERMINATED.getExitCode()) { LOG.warn("Exception from container-launch with container ID: " + containerId + " and exit code: " + exitCode , e); - logOutput(shExec.getOutput()); - String diagnostics = "Exception from container-launch: \n" - + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); + + StringBuilder builder = new StringBuilder(); + builder.append("Exception from container-launch.\n"); + builder.append("Container id: " + containerId + "\n"); + builder.append("Exit code: " + exitCode + "\n"); + if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { + builder.append("Exception message: " + e.getMessage() + "\n"); + } + builder.append("Stack trace: " + + StringUtils.stringifyException(e) + "\n"); + if (!shExec.getOutput().isEmpty()) { + builder.append("Shell output: " + shExec.getOutput() + "\n"); + } + String diagnostics = builder.toString(); + logOutput(diagnostics); container.handle(new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); } else { @@ -390,6 +403,13 @@ public class LinuxContainerExecutor exte } } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + // Send a test signal to the process as the user to see if it's alive + return signalContainer(user, pid, Signal.NULL); + } + public void mountCgroups(List<String> cgroupKVs, String hierarchy) throws IOException { List<String> command = new ArrayList<String>( Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Aug 19 23:49:39 2014 @@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; @@ -78,9 +81,11 @@ public class NodeManager extends Composi private ContainerManagerImpl containerManager; private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; + private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); - + private boolean rmWorkPreservingRestartEnabled; + public NodeManager() { super(NodeManager.class.getName()); } @@ -110,14 +115,15 @@ public class NodeManager extends Composi } protected DeletionService createDeletionService(ContainerExecutor exec) { - return new DeletionService(exec); + return new DeletionService(exec, nmStore); } protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInNM nmTokenSecretManager) { + NMTokenSecretManagerInNM nmTokenSecretManager, + NMStateStoreService stateStore) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager); + dirsHandler, aclsManager, stateStore); } protected void doSecureLogin() throws IOException { @@ -125,11 +131,8 @@ public class NodeManager extends Composi YarnConfiguration.NM_PRINCIPAL); } - @Override - protected void serviceInit(Configuration conf) throws Exception { - - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); - + private void initAndStartRecoveryStore(Configuration conf) + throws IOException { boolean recoveryEnabled = conf.getBoolean( YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); @@ -142,13 +145,57 @@ public class NodeManager extends Composi } Path recoveryRoot = new Path(recoveryDirName); recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700)); + nmStore = new NMLeveldbStateStoreService(); + } else { + nmStore = new NMNullStateStoreService(); + } + nmStore.init(conf); + nmStore.start(); + } + + private void stopRecoveryStore() throws IOException { + nmStore.stop(); + if (context.getDecommissioned() && nmStore.canRecover()) { + LOG.info("Removing state store due to decommission"); + Configuration conf = getConfig(); + Path recoveryRoot = new Path( + conf.get(YarnConfiguration.NM_RECOVERY_DIR)); + LOG.info("Removing state store at " + recoveryRoot + + " due to decommission"); + FileSystem recoveryFs = FileSystem.getLocal(conf); + if (!recoveryFs.delete(recoveryRoot, true)) { + LOG.warn("Unable to delete " + recoveryRoot); + } + } + } + + private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager, + NMContainerTokenSecretManager containerTokenSecretManager) + throws IOException { + if (nmStore.canRecover()) { + nmTokenSecretManager.recover(); + containerTokenSecretManager.recover(); } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + + rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration + .RM_WORK_PRESERVING_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); + + initAndStartRecoveryStore(conf); NMContainerTokenSecretManager containerTokenSecretManager = - new NMContainerTokenSecretManager(conf); + new NMContainerTokenSecretManager(conf, nmStore); NMTokenSecretManagerInNM nmTokenSecretManager = - new NMTokenSecretManagerInNM(); + new NMTokenSecretManagerInNM(nmStore); + + recoverTokens(nmTokenSecretManager, containerTokenSecretManager); this.aclsManager = new ApplicationACLsManager(conf); @@ -171,7 +218,7 @@ public class NodeManager extends Composi dirsHandler = nodeHealthChecker.getDiskHandler(); this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, nmStore); nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); @@ -220,6 +267,7 @@ public class NodeManager extends Composi return; } super.serviceStop(); + stopRecoveryStore(); DefaultMetricsSystem.shutdown(); } @@ -244,8 +292,12 @@ public class NodeManager extends Composi try { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); - LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); + if (!rmWorkPreservingRestartEnabled) { + LOG.info("Cleaning up running containers on resync"); + containerManager.cleanupContainersOnNMResync(); + } else { + LOG.info("Preserving containers on resync"); + } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { @@ -272,10 +324,13 @@ public class NodeManager extends Composi private WebServer webServer; private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); - + private final NMStateStoreService stateStore; + private boolean isDecommissioned = false; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) { + LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, + NMStateStoreService stateStore) { this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -283,6 +338,7 @@ public class NodeManager extends Composi this.nodeHealthStatus.setIsNodeHealthy(true); this.nodeHealthStatus.setHealthReport("Healthy"); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); + this.stateStore = stateStore; } /** @@ -349,6 +405,21 @@ public class NodeManager extends Composi public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + + @Override + public NMStateStoreService getNMStateStore() { + return stateStore; + } + + @Override + public boolean getDecommissioned() { + return isDecommissioned; + } + + @Override + public void setDecommissioned(boolean isDecommissioned) { + this.isDecommissioned = isDecommissioned; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Aug 19 23:49:39 2014 @@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record public interface NodeStatusUpdater extends Service { + /** + * Schedule a heartbeat to the ResourceManager outside of the normal, + * periodic heartbeating process. This is typically called when the state + * of containers on the node has changed to notify the RM sooner. + */ void sendOutofBandHeartBeat(); + /** + * Get the ResourceManager identifier received during registration + * @return the ResourceManager ID + */ long getRMIdentifier(); + /** + * Query if a container has recently completed + * @param containerId the container ID + * @return true if the container has recently completed + */ public boolean isContainerRecentlyStopped(ContainerId containerId); + /** + * Add a container to the list of containers that have recently completed + * @param containerId the ID of the completed container + */ + public void addCompletedContainer(ContainerId containerId); + + /** + * Clear the list of recently completed containers + */ public void clearFinishedContainersFromCache(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Aug 19 23:49:39 2014 @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -246,13 +247,12 @@ public class NodeStatusUpdaterImpl exten @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - List<ContainerStatus> containerStatuses = getContainerStatuses(); + List<NMContainerStatus> containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); - if (containerStatuses != null) { - LOG.info("Registering with RM using finished containers :" - + containerStatuses); + nodeManagerVersionId, containerReports, getRunningApplications()); + if (containerReports != null) { + LOG.info("Registering with RM using containers :" + containerReports); } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); @@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } if (LOG.isDebugEnabled()) { @@ -374,10 +373,42 @@ public class NodeStatusUpdaterImpl exten } return containerStatuses; } + + private List<ApplicationId> getRunningApplications() { + List<ApplicationId> runningApplications = new ArrayList<ApplicationId>(); + runningApplications.addAll(this.context.getApplications().keySet()); + return runningApplications; + } + + // These NMContainerStatus are sent on NM registration and used by YARN only. + private List<NMContainerStatus> getNMContainerStatuses() { + List<NMContainerStatus> containerStatuses = + new ArrayList<NMContainerStatus>(); + for (Container container : this.context.getContainers().values()) { + NMContainerStatus status = + container.getNMContainerStatus(); + containerStatuses.add(status); + if (status.getContainerState().equals(ContainerState.COMPLETE)) { + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + addCompletedContainer(container.getContainerId()); + } + } + LOG.info("Sending out " + containerStatuses.size() + + " NM container statuses: " + containerStatuses); + return containerStatuses; + } - private void addCompletedContainer(Container container) { + @Override + public void addCompletedContainer(ContainerId containerId) { synchronized (previousCompletedContainers) { - previousCompletedContainers.add(container.getContainerId()); + previousCompletedContainers.add(containerId); + } + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); } } @@ -424,16 +455,6 @@ public class NodeStatusUpdaterImpl exten } } - @Private - @VisibleForTesting - public void updateStoppedContainersInCache(ContainerId containerId) { - synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); - } - } - @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -449,8 +470,14 @@ public class NodeStatusUpdaterImpl exten Iterator<ContainerId> i = recentlyStoppedContainers.keySet().iterator(); while (i.hasNext()) { - if (recentlyStoppedContainers.get(i.next()) < currentTime) { + ContainerId cid = i.next(); + if (recentlyStoppedContainers.get(cid) < currentTime) { i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container " + cid + " in store", e); + } } else { break; } @@ -493,6 +520,7 @@ public class NodeStatusUpdaterImpl exten + " hence shutting down."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); + context.setDecommissioned(true); dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java Tue Aug 19 23:49:39 2014 @@ -27,4 +27,6 @@ public interface ResourceView { long getPmemAllocatedForContainers(); boolean isPmemCheckEnabled(); + + long getVCoresAllocatedForContainers(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Aug 19 23:49:39 2014 @@ -20,8 +20,10 @@ package org.apache.hadoop.yarn.server.no import static org.apache.hadoop.service.Service.STATE.STARTED; +import java.io.DataInputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +43,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -62,13 +65,17 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -79,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; @@ -116,11 +125,16 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -218,6 +232,108 @@ public class ContainerManagerImpl extend SHUTDOWN_CLEANUP_SLOP_MS; super.serviceInit(conf); + recover(); + } + + @SuppressWarnings("unchecked") + private void recover() throws IOException, URISyntaxException { + NMStateStoreService stateStore = context.getNMStateStore(); + if (stateStore.canRecover()) { + rsrcLocalizationSrvc.recoverLocalizedResources( + stateStore.loadLocalizationState()); + + RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); + for (ContainerManagerApplicationProto proto : + appsState.getApplications()) { + recoverApplication(proto); + } + + for (RecoveredContainerState rcs : stateStore.loadContainersState()) { + recoverContainer(rcs); + } + + String diagnostic = "Application marked finished during recovery"; + for (ApplicationId appId : appsState.getFinishedApplications()) { + dispatcher.getEventHandler().handle( + new ApplicationFinishEvent(appId, diagnostic)); + } + } + } + + private void recoverApplication(ContainerManagerApplicationProto p) + throws IOException { + ApplicationId appId = new ApplicationIdPBImpl(p.getId()); + Credentials creds = new Credentials(); + creds.readTokenStorageStream( + new DataInputStream(p.getCredentials().newInput())); + + List<ApplicationACLMapProto> aclProtoList = p.getAclsList(); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(aclProtoList.size()); + for (ApplicationACLMapProto aclProto : aclProtoList) { + acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()), + aclProto.getAcl()); + } + + LOG.info("Recovering application " + appId); + ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, + creds, context); + context.getApplications().put(appId, app); + app.handle(new ApplicationInitEvent(appId, acls)); + } + + @SuppressWarnings("unchecked") + private void recoverContainer(RecoveredContainerState rcs) + throws IOException { + StartContainerRequest req = rcs.getStartRequest(); + ContainerLaunchContext launchContext = req.getContainerLaunchContext(); + ContainerTokenIdentifier token = + BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + ContainerId containerId = token.getContainerID(); + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Recovering " + containerId + " in state " + rcs.getStatus() + + " with exit code " + rcs.getExitCode()); + + if (context.getApplications().containsKey(appId)) { + Credentials credentials = parseCredentials(launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + context.getNMStateStore(), req.getContainerLaunchContext(), + credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), + rcs.getDiagnostics(), rcs.getKilled()); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); + } else { + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { + LOG.warn(containerId + " has no corresponding application!"); + } + LOG.info("Adding " + containerId + " to recently stopped containers"); + nodeStatusUpdater.addCompletedContainer(containerId); + } + } + + private void waitForRecoveredContainers() throws InterruptedException { + final int sleepMsec = 100; + int waitIterations = 100; + List<ContainerId> newContainers = new ArrayList<ContainerId>(); + while (--waitIterations >= 0) { + newContainers.clear(); + for (Container container : context.getContainers().values()) { + if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) { + newContainers.add(container.getContainerId()); + } + } + if (newContainers.isEmpty()) { + break; + } + LOG.info("Waiting for containers: " + newContainers); + Thread.sleep(sleepMsec); + } + if (waitIterations < 0) { + LOG.warn("Timeout waiting for recovered containers"); + } } protected LogHandler createLogHandler(Configuration conf, Context context, @@ -239,7 +355,7 @@ public class ContainerManagerImpl extend protected ResourceLocalizationService createResourceLocalizationService( ContainerExecutor exec, DeletionService deletionContext) { return new ResourceLocalizationService(this.dispatcher, exec, - deletionContext, dirsHandler); + deletionContext, dirsHandler, context.getNMStateStore()); } protected ContainersLauncher createContainersLauncher(Context context, @@ -253,6 +369,23 @@ public class ContainerManagerImpl extend // Enqueue user dirs in deletion context Configuration conf = getConfig(); + final InetSocketAddress initialAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_PORT); + boolean usingEphemeralPort = (initialAddress.getPort() == 0); + if (context.getNMStateStore().canRecover() && usingEphemeralPort) { + throw new IllegalArgumentException("Cannot support recovery with an " + + "ephemeral server port. Check the setting of " + + YarnConfiguration.NM_ADDRESS); + } + // If recovering then delay opening the RPC service until the recovery + // of resources and containers have completed, otherwise requests from + // clients during recovery can interfere with the recovery process. + final boolean delayedRpcServerStart = + context.getNMStateStore().canRecover(); + Configuration serverConf = new Configuration(conf); // always enforce it to be token-based. @@ -262,11 +395,6 @@ public class ContainerManagerImpl extend YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress initialAddress = conf.getSocketAddr( - YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_PORT); - server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(), @@ -283,16 +411,61 @@ public class ContainerManagerImpl extend LOG.info("Blocking new container-requests as container manager rpc" + " server is still starting."); this.setBlockNewContainerRequests(true); - server.start(); - InetSocketAddress connectAddress = NetUtils.getConnectAddress(server); - NodeId nodeId = NodeId.newInstance( - connectAddress.getAddress().getCanonicalHostName(), - connectAddress.getPort()); + + String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST); + String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS); + String hostOverride = null; + if (bindHost != null && !bindHost.isEmpty() + && nmAddress != null && !nmAddress.isEmpty()) { + //a bind-host case with an address, to support overriding the first + //hostname found when querying for our hostname with the specified + //address, combine the specified address with the actual port listened + //on by the server + hostOverride = nmAddress.split(":")[0]; + } + + // setup node ID + InetSocketAddress connectAddress; + if (delayedRpcServerStart) { + connectAddress = NetUtils.getConnectAddress(initialAddress); + } else { + server.start(); + connectAddress = NetUtils.getConnectAddress(server); + } + NodeId nodeId = buildNodeId(connectAddress, hostOverride); ((NodeManager.NMContext)context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); - LOG.info("ContainerManager started at " + connectAddress); + + // start remaining services super.serviceStart(); + + if (delayedRpcServerStart) { + waitForRecoveredContainers(); + server.start(); + + // check that the node ID is as previously advertised + connectAddress = NetUtils.getConnectAddress(server); + NodeId serverNode = buildNodeId(connectAddress, hostOverride); + if (!serverNode.equals(nodeId)) { + throw new IOException("Node mismatch after server started, expected '" + + nodeId + "' but found '" + serverNode + "'"); + } + } + + LOG.info("ContainerManager started at " + connectAddress); + LOG.info("ContainerManager bound to " + initialAddress); + } + + private NodeId buildNodeId(InetSocketAddress connectAddress, + String hostOverride) { + if (hostOverride != null) { + connectAddress = NetUtils.getConnectAddress( + new InetSocketAddress(hostOverride, connectAddress.getPort())); + } + return NodeId.newInstance( + connectAddress.getAddress().getCanonicalHostName(), + connectAddress.getPort()); } void refreshServiceAcls(Configuration configuration, @@ -329,6 +502,12 @@ public class ContainerManagerImpl extend } LOG.info("Applications still running : " + applications.keySet()); + if (this.context.getNMStateStore().canRecover() + && !this.context.getDecommissioned()) { + // do not cleanup apps as they can be recovered on restart + return; + } + List<ApplicationId> appIds = new ArrayList<ApplicationId>(applications.keySet()); this.handle( @@ -463,8 +642,8 @@ public class ContainerManagerImpl extend boolean unauthorized = false; StringBuilder messageBuilder = new StringBuilder("Unauthorized request to start container. "); - if (!nmTokenIdentifier.getApplicationAttemptId().equals( - containerId.getApplicationAttemptId())) { + if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals( + containerId.getApplicationAttemptId().getApplicationId())) { unauthorized = true; messageBuilder.append("\nNMToken for application attempt : ") .append(nmTokenIdentifier.getApplicationAttemptId()) @@ -485,6 +664,8 @@ public class ContainerManagerImpl extend messageBuilder.append("\nThis token is expired. current time is ") .append(System.currentTimeMillis()).append(" found ") .append(containerTokenIdentifier.getExpiryTimeStamp()); + messageBuilder.append("\nNote: System times on machines may be out of sync.") + .append(" Check system time and time zones."); } if (unauthorized) { String msg = messageBuilder.toString(); @@ -536,6 +717,41 @@ public class ContainerManagerImpl extend succeededContainers, failedContainers); } + private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, + String user, Credentials credentials, + Map<ApplicationAccessType, String> appAcls) { + + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId).getProto()); + builder.setUser(user); + + builder.clearCredentials(); + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + builder.setCredentials(ByteString.copyFrom(dob.getData())); + } catch (IOException e) { + // should not occur + LOG.error("Cannot serialize credentials", e); + } + } + + builder.clearAcls(); + if (appAcls != null) { + for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) { + ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()) + .build(); + builder.addAcls(p); + } + } + + return builder.build(); + } + @SuppressWarnings("unchecked") private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, @@ -588,7 +804,8 @@ public class ContainerManagerImpl extend Credentials credentials = parseCredentials(launchContext); Container container = - new ContainerImpl(getConfig(), this.dispatcher, launchContext, + new ContainerImpl(getConfig(), this.dispatcher, + context.getNMStateStore(), launchContext, credentials, metrics, containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); @@ -609,12 +826,15 @@ public class ContainerManagerImpl extend if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); - + Map<ApplicationAccessType, String> appAcls = + container.getLaunchContext().getApplicationACLs(); + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, appAcls)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, container.getLaunchContext() - .getApplicationACLs())); + new ApplicationInitEvent(applicationID, appAcls)); } + this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -662,7 +882,7 @@ public class ContainerManagerImpl extend } private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws YarnException { + throws IOException { Credentials credentials = new Credentials(); // //////////// Parse credentials ByteBuffer tokens = launchContext.getTokens(); @@ -671,15 +891,11 @@ public class ContainerManagerImpl extend DataInputByteBuffer buf = new DataInputByteBuffer(); tokens.rewind(); buf.reset(tokens); - try { - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); } - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); } } // //////////// End of parsing credentials @@ -712,7 +928,7 @@ public class ContainerManagerImpl extend @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID) throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); @@ -725,9 +941,11 @@ public class ContainerManagerImpl extend + " is not handled by this NodeManager"); } } else { + context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + ContainerExitStatus.KILLED_BY_APPMASTER, + "Container killed by the ApplicationMaster.")); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID @@ -797,26 +1015,24 @@ public class ContainerManagerImpl extend * belongs to the same application attempt (NMToken) which was used. (Note:- * This will prevent user in knowing another application's containers). */ - - if ((!identifier.getApplicationAttemptId().equals( - containerId.getApplicationAttemptId())) - || (container != null && !identifier.getApplicationAttemptId().equals( - container.getContainerId().getApplicationAttemptId()))) { + ApplicationId nmTokenAppId = + identifier.getApplicationAttemptId().getApplicationId(); + if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId())) + || (container != null && !nmTokenAppId.equals(container + .getContainerId().getApplicationAttemptId().getApplicationId()))) { if (stopRequest) { LOG.warn(identifier.getApplicationAttemptId() + " attempted to stop non-application container : " - + container.getContainerId().toString()); + + container.getContainerId()); NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER, "ContainerManagerImpl", "Trying to stop unknown container!", - identifier.getApplicationAttemptId().getApplicationId(), - container.getContainerId()); + nmTokenAppId, container.getContainerId()); } else { LOG.warn(identifier.getApplicationAttemptId() + " attempted to get status for non-application container : " - + container.getContainerId().toString()); + + container.getContainerId()); } } - } class ContainerEventDispatcher implements EventHandler<ContainerEvent> { @@ -864,6 +1080,11 @@ public class ContainerManagerImpl extend } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) { diagnostic = "Application killed by ResourceManager"; } + try { + this.context.getNMStateStore().storeFinishedApplication(appID); + } catch (IOException e) { + LOG.error("Unable to update application state in store", e); + } this.dispatcher.getEventHandler().handle( new ApplicationFinishEvent(appID, diagnostic)); @@ -876,6 +1097,7 @@ public class ContainerManagerImpl extend .getContainersToCleanup()) { this.dispatcher.getEventHandler().handle( new ContainerKillEvent(container, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, "Container Killed by ResourceManager")); } break; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; @@ -375,6 +377,7 @@ public class ApplicationImpl implements for (ContainerId containerID : app.containers.keySet()) { app.dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, + ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, "Container killed on application-finish event: " + appEvent.getDiagnostic())); } return ApplicationState.FINISHING_CONTAINERS_WAIT; @@ -426,6 +429,11 @@ public class ApplicationImpl implements ApplicationId appId = event.getApplicationID(); app.context.getApplications().remove(appId); app.aclsManager.removeApplication(appId); + try { + app.context.getNMStateStore().removeApplication(appId); + } catch (IOException e) { + LOG.error("Unable to remove application from state store", e); + } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Tue Aug 19 23:49:39 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public interface Container extends EventHandler<ContainerEvent> { @@ -39,7 +40,7 @@ public interface Container extends Event ContainerTokenIdentifier getContainerTokenIdentifier(); String getUser(); - + ContainerState getContainerState(); ContainerLaunchContext getLaunchContext(); @@ -50,6 +51,8 @@ public interface Container extends Event ContainerStatus cloneAndGetContainerStatus(); + NMContainerStatus getNMContainerStatus(); + String toString(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -47,7 +48,7 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -75,6 +78,7 @@ public class ContainerImpl implements Co private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; + private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; private final ContainerLaunchContext launchContext; @@ -101,12 +105,19 @@ public class ContainerImpl implements Co private final List<LocalResourceRequest> appRsrcs = new ArrayList<LocalResourceRequest>(); + // whether container has been recovered after a restart + private RecoveredContainerStatus recoveredStatus = + RecoveredContainerStatus.REQUESTED; + // whether container was marked as killed after recovery + private boolean recoveredAsKilled = false; + public ContainerImpl(Configuration conf, Dispatcher dispatcher, - ContainerLaunchContext launchContext, Credentials creds, - NodeManagerMetrics metrics, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier) { this.daemonConf = conf; this.dispatcher = dispatcher; + this.stateStore = stateStore; this.launchContext = launchContext; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); @@ -122,6 +133,21 @@ public class ContainerImpl implements Co stateMachine = stateMachineFactory.make(this); } + // constructor for a recovered container + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, + RecoveredContainerStatus recoveredStatus, int exitCode, + String diagnostics, boolean wasKilled) { + this(conf, dispatcher, stateStore, launchContext, creds, metrics, + containerTokenIdentifier); + this.recoveredStatus = recoveredStatus; + this.exitCode = exitCode; + this.recoveredAsKilled = wasKilled; + this.diagnostics.append(diagnostics); + } + private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = new ContainerDoneTransition(); @@ -135,8 +161,10 @@ public class ContainerImpl implements Co new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW) // From NEW State .addTransition(ContainerState.NEW, - EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED, - ContainerState.LOCALIZATION_FAILED), + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.LOCALIZED, + ContainerState.LOCALIZATION_FAILED, + ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, @@ -281,7 +309,9 @@ public class ContainerImpl implements Co UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -295,7 +325,9 @@ public class ContainerImpl implements Co // we notify container of failed localization if localizer thread (for // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.RESOURCE_FAILED) + EnumSet.of(ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // create the topology tables .installTopology(); @@ -388,6 +420,19 @@ public class ContainerImpl implements Co } @Override + public NMContainerStatus getNMContainerStatus() { + this.readLock.lock(); + try { + return NMContainerStatus.newInstance(this.containerId, getCurrentState(), + getResource(), diagnostics.toString(), exitCode, + containerTokenIdentifier.getPriority(), + containerTokenIdentifier.getCreationTime()); + } finally { + this.readLock.unlock(); + } + } + + @Override public ContainerId getContainerId() { return this.containerId; } @@ -407,7 +452,7 @@ public class ContainerImpl implements Co } } - @SuppressWarnings({"fallthrough", "unchecked"}) + @SuppressWarnings("fallthrough") private void finished() { ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); @@ -445,7 +490,11 @@ public class ContainerImpl implements Co } metrics.releaseContainer(this.resource); + sendFinishedEvents(); + } + @SuppressWarnings("unchecked") + private void sendFinishedEvents() { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); @@ -458,6 +507,45 @@ public class ContainerImpl implements Co } @SuppressWarnings("unchecked") // dispatcher not typed + private void sendLaunchEvent() { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.LAUNCH_CONTAINER; + if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { + // try to recover a container that was previously launched + launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); + } + + // Inform the ContainersMonitor to start monitoring the container's + // resource usage. + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendContainerMonitorStartEvent() { + long pmemBytes = getResource().getMemory() * 1024 * 1024L; + float pmemRatio = daemonConf.getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + + dispatcher.getEventHandler().handle( + new ContainerStartMonitoringEvent(containerId, + vmemBytes, pmemBytes)); + } + + private void addDiagnostics(String... diags) { + for (String s : diags) { + this.diagnostics.append(s); + } + try { + stateStore.storeContainerDiagnostics(containerId, diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update diagnostics in state store for " + + containerId, e); + } + } + + @SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc = new HashMap<LocalResourceVisibility, @@ -505,6 +593,16 @@ public class ContainerImpl implements Co @Override public ContainerState transition(ContainerImpl container, ContainerEvent event) { + if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { + container.sendFinishedEvents(); + return ContainerState.DONE; + } else if (container.recoveredAsKilled && + container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { + // container was killed but never launched + container.finished(); + return ContainerState.DONE; + } + final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); @@ -580,9 +678,7 @@ public class ContainerImpl implements Co new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -593,7 +689,6 @@ public class ContainerImpl implements Co * Transition when one of the requested resources for this container * has been successfully localized. */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedTransition implements MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> { @Override @@ -613,9 +708,8 @@ public class ContainerImpl implements Co if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -625,24 +719,22 @@ public class ContainerImpl implements Co * Transition from LOCALIZED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LaunchTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { - // Inform the ContainersMonitor to start monitoring the container's - // resource usage. - long pmemBytes = - container.getResource().getMemory() * 1024 * 1024L; - float pmemRatio = container.daemonConf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - long vmemBytes = (long) (pmemRatio * pmemBytes); - - container.dispatcher.getEventHandler().handle( - new ContainerStartMonitoringEvent(container.containerId, - vmemBytes, pmemBytes)); + container.sendContainerMonitorStartEvent(); container.metrics.runningContainer(); container.wasLaunched = true; + + if (container.recoveredAsKilled) { + LOG.info("Killing " + container.containerId + + " due to recovered as killed"); + container.addDiagnostics("Container recovered as killed.\n"); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } } } @@ -694,8 +786,7 @@ public class ContainerImpl implements Co ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // TODO: Add containerWorkDir to the deletion service. @@ -722,7 +813,7 @@ public class ContainerImpl implements Co @Override public void transition(ContainerImpl container, ContainerEvent event) { super.transition(container, event); - container.diagnostics.append("Killed by external signal\n"); + container.addDiagnostics("Killed by external signal\n"); } } @@ -737,9 +828,7 @@ public class ContainerImpl implements Co ContainerResourceFailedEvent rsrcFailedEvent = (ContainerResourceFailedEvent) event; - container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage() - + "\n"); - + container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n"); // Inform the localizer to decrement reference counts and cleanup // resources. @@ -761,9 +850,9 @@ public class ContainerImpl implements Co container.cleanup(); container.metrics.endInitingContainer(); ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.exitCode = ExitCode.TERMINATED.getExitCode(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.exitCode = killEvent.getContainerExitStatus(); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); } } @@ -804,7 +893,8 @@ public class ContainerImpl implements Co new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER)); ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.exitCode = killEvent.getContainerExitStatus(); } } @@ -817,10 +907,12 @@ public class ContainerImpl implements Co @Override public void transition(ContainerImpl container, ContainerEvent event) { ContainerExitEvent exitEvent = (ContainerExitEvent) event; - container.exitCode = exitEvent.getExitCode(); + if (container.hasDefaultExitCode()) { + container.exitCode = exitEvent.getExitCode(); + } + if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // The process/process-grp is killed. Decrement reference counts and @@ -859,9 +951,9 @@ public class ContainerImpl implements Co @Override public void transition(ContainerImpl container, ContainerEvent event) { ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.exitCode = ExitCode.TERMINATED.getExitCode(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.exitCode = killEvent.getContainerExitStatus(); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); super.transition(container, event); } } @@ -875,8 +967,14 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { ContainerDiagnosticsUpdateEvent updateEvent = (ContainerDiagnosticsUpdateEvent) event; - container.diagnostics.append(updateEvent.getDiagnosticsUpdate()) - .append("\n"); + container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n"); + try { + container.stateStore.storeContainerDiagnostics(container.containerId, + container.diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update state store diagnostics for " + + container.containerId, e); + } } } @@ -916,4 +1014,8 @@ public class ContainerImpl implements Co this.readLock.unlock(); } } + + private boolean hasDefaultExitCode() { + return (this.exitCode == ContainerExitStatus.INVALID); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java Tue Aug 19 23:49:39 2014 @@ -23,13 +23,21 @@ import org.apache.hadoop.yarn.api.record public class ContainerKillEvent extends ContainerEvent { private final String diagnostic; + private final int exitStatus; - public ContainerKillEvent(ContainerId cID, String diagnostic) { + public ContainerKillEvent(ContainerId cID, + int exitStatus, String diagnostic) { super(cID, ContainerEventType.KILL_CONTAINER); + this.exitStatus = exitStatus; this.diagnostic = diagnostic; } public String getDiagnostic() { return this.diagnostic; } + + public int getContainerExitStatus() { + return this.exitStatus; + } + }
