Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Aug 19 23:49:39 2014 @@ -87,22 +87,23 @@ public class ContainerLaunch implements public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; private static final String PID_FILE_NAME_FMT = "%s.pid"; + private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; - private final Dispatcher dispatcher; - private final ContainerExecutor exec; + protected final Dispatcher dispatcher; + protected final ContainerExecutor exec; private final Application app; - private final Container container; + protected final Container container; private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); - private volatile AtomicBoolean completed = new AtomicBoolean(false); + protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; - private Path pidFilePath = null; + protected Path pidFilePath = null; private final LocalDirsHandlerService dirsHandler; @@ -223,14 +224,11 @@ public class ContainerLaunch implements + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT, - containerIdStr); + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users - pidFilePath = dirsHandler.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR - + pidFileSuffix); + pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); List<String> localDirs = dirsHandler.getLocalDirs(); List<String> logDirs = dirsHandler.getLogDirs(); @@ -288,6 +286,7 @@ public class ContainerLaunch implements dispatcher.getEventHandler().handle(new ContainerEvent( containerID, ContainerEventType.CONTAINER_LAUNCHED)); + context.getNMStateStore().storeContainerLaunched(containerID); // Check if the container is signalled to be killed. if (!shouldLaunchContainer.compareAndSet(false, true)) { @@ -310,6 +309,11 @@ public class ContainerLaunch implements } finally { completed.set(true); exec.deactivateContainer(containerID); + try { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerID); + } } if (LOG.isDebugEnabled()) { @@ -342,6 +346,11 @@ public class ContainerLaunch implements ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return 0; } + + protected String getPidFileSubpath(String appIdStr, String containerIdStr) { + return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); + } /** * Cleanup the container. @@ -357,6 +366,13 @@ public class ContainerLaunch implements String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } + // launch flag will be set to true if process already launched boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); if (!alreadyLaunched) { @@ -421,6 +437,7 @@ public class ContainerLaunch implements if (pidFilePath != null) { FileContext lfs = FileContext.getLocalFSFileContext(); lfs.delete(pidFilePath, false); + lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); } } } @@ -479,15 +496,24 @@ public class ContainerLaunch implements + appIdStr; } - private static abstract class ShellScriptBuilder { + Context getContext() { + return context; + } + + @VisibleForTesting + static abstract class ShellScriptBuilder { + public static ShellScriptBuilder create() { + return Shell.WINDOWS ? new WindowsShellScriptBuilder() : + new UnixShellScriptBuilder(); + } private static final String LINE_SEPARATOR = System.getProperty("line.separator"); private final StringBuilder sb = new StringBuilder(); - public abstract void command(List<String> command); + public abstract void command(List<String> command) throws IOException; - public abstract void env(String key, String value); + public abstract void env(String key, String value) throws IOException; public final void symlink(Path src, Path dst) throws IOException { if (!src.isAbsolute()) { @@ -520,11 +546,19 @@ public class ContainerLaunch implements protected abstract void link(Path src, Path dst) throws IOException; - protected abstract void mkdir(Path path); + protected abstract void mkdir(Path path) throws IOException; } private static final class UnixShellScriptBuilder extends ShellScriptBuilder { + private void errorCheck() { + line("hadoop_shell_errorcode=$?"); + line("if [ $hadoop_shell_errorcode -ne 0 ]"); + line("then"); + line(" exit $hadoop_shell_errorcode"); + line("fi"); + } + public UnixShellScriptBuilder(){ line("#!/bin/bash"); line(); @@ -533,6 +567,7 @@ public class ContainerLaunch implements @Override public void command(List<String> command) { line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\""); + errorCheck(); } @Override @@ -543,31 +578,43 @@ public class ContainerLaunch implements @Override protected void link(Path src, Path dst) throws IOException { line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\""); + errorCheck(); } @Override protected void mkdir(Path path) { line("mkdir -p ", path.toString()); + errorCheck(); } } private static final class WindowsShellScriptBuilder extends ShellScriptBuilder { + private void errorCheck() { + line("@if %errorlevel% neq 0 exit /b %errorlevel%"); + } + + private void lineWithLenCheck(String... commands) throws IOException { + Shell.checkWindowsCommandLineLength(commands); + line(commands); + } + public WindowsShellScriptBuilder() { line("@setlocal"); line(); } @Override - public void command(List<String> command) { - line("@call ", StringUtils.join(" ", command)); + public void command(List<String> command) throws IOException { + lineWithLenCheck("@call ", StringUtils.join(" ", command)); + errorCheck(); } @Override - public void env(String key, String value) { - line("@set ", key, "=", value, - "\nif %errorlevel% neq 0 exit /b %errorlevel%"); + public void env(String key, String value) throws IOException { + lineWithLenCheck("@set ", key, "=", value); + errorCheck(); } @Override @@ -578,16 +625,20 @@ public class ContainerLaunch implements // If not on Java7+ on Windows, then copy file instead of symlinking. // See also FileUtil#symLink for full explanation. if (!Shell.isJava7OrAbove() && srcFile.isFile()) { - line(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr)); + lineWithLenCheck(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr)); + errorCheck(); } else { - line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS, + lineWithLenCheck(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS, dstFileStr, srcFileStr)); + errorCheck(); } } @Override - protected void mkdir(Path path) { - line("@if not exist ", path.toString(), " mkdir ", path.toString()); + protected void mkdir(Path path) throws IOException { + lineWithLenCheck(String.format("@if not exist \"%s\" mkdir \"%s\"", + path.toString(), path.toString())); + errorCheck(); } } @@ -730,8 +781,7 @@ public class ContainerLaunch implements Map<String,String> environment, Map<Path,List<String>> resources, List<String> command) throws IOException { - ShellScriptBuilder sb = Shell.WINDOWS ? new WindowsShellScriptBuilder() : - new UnixShellScriptBuilder(); + ShellScriptBuilder sb = ShellScriptBuilder.create(); if (environment != null) { for (Map.Entry<String,String> env : environment.entrySet()) { sb.env(env.getKey().toString(), env.getValue().toString()); @@ -758,4 +808,7 @@ public class ContainerLaunch implements } } + public static String getExitCodeFile(String pidFile) { + return pidFile + EXIT_CODE_FILE_SUFFIX; + } }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Aug 19 23:49:39 2014 @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -107,7 +101,6 @@ public class ContainersLauncher extends super.serviceStop(); } - @SuppressWarnings("unchecked") @Override public void handle(ContainersLauncherEvent event) { // TODO: ContainersLauncher launches containers one by one!! @@ -125,6 +118,14 @@ public class ContainersLauncher extends containerLauncher.submit(launch); running.put(containerId, launch); break; + case RECOVER_CONTAINER: + app = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()); + launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, + exec, app, event.getContainer(), dirsHandler, containerManager); + containerLauncher.submit(launch); + running.put(containerId, launch); + break; case CLEANUP_CONTAINER: ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Tue Aug 19 23:49:39 2014 @@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no public enum ContainersLauncherEventType { LAUNCH_CONTAINER, + RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Tue Aug 19 23:49:39 2014 @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import com.google.common.annotations.VisibleForTesting; + /** * {@link LocalCacheDirectoryManager} is used for managing hierarchical * directories for local cache. It will allow to restrict the number of files in @@ -99,6 +101,57 @@ public class LocalCacheDirectoryManager } } + /** + * Increment the file count for a relative directory within the cache + * + * @param relPath the relative path + */ + public synchronized void incrementFileCountForPath(String relPath) { + relPath = relPath == null ? "" : relPath.trim(); + Directory subDir = knownDirectories.get(relPath); + if (subDir == null) { + int dirnum = Directory.getDirectoryNumber(relPath); + totalSubDirectories = Math.max(dirnum, totalSubDirectories); + subDir = new Directory(dirnum); + nonFullDirectories.add(subDir); + knownDirectories.put(subDir.getRelativePath(), subDir); + } + if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) { + nonFullDirectories.remove(subDir); + } + } + + /** + * Given a path to a directory within a local cache tree return the + * root of the cache directory. + * + * @param path the directory within a cache directory + * @return the local cache directory root or null if not found + */ + public static Path getCacheDirectoryRoot(Path path) { + while (path != null) { + String name = path.getName(); + if (name.length() != 1) { + return path; + } + int dirnum = DIRECTORIES_PER_LEVEL; + try { + dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL); + } catch (NumberFormatException e) { + } + if (dirnum >= DIRECTORIES_PER_LEVEL) { + return path; + } + path = path.getParent(); + } + return path; + } + + @VisibleForTesting + synchronized Directory getDirectory(String relPath) { + return knownDirectories.get(relPath); + } + /* * It limits the number of files and sub directories in the directory to the * limit LocalCacheDirectoryManager#perDirectoryFileLimit. @@ -108,11 +161,9 @@ public class LocalCacheDirectoryManager private final String relativePath; private int fileCount; - public Directory(int directoryNo) { - fileCount = 0; - if (directoryNo == 0) { - relativePath = ""; - } else { + static String getRelativePath(int directoryNo) { + String relativePath = ""; + if (directoryNo > 0) { String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL); StringBuffer sb = new StringBuffer(); if (tPath.length() == 1) { @@ -128,6 +179,27 @@ public class LocalCacheDirectoryManager } relativePath = sb.toString(); } + return relativePath; + } + + static int getDirectoryNumber(String relativePath) { + String numStr = relativePath.replace("/", ""); + if (relativePath.isEmpty()) { + return 0; + } + if (numStr.length() > 1) { + // undo step from getRelativePath() to reuse 0th sub directory + String firstChar = Integer.toString( + Integer.parseInt(numStr.substring(0, 1), + DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL); + numStr = firstChar + numStr.substring(1); + } + return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1; + } + + public Directory(int directoryNo) { + fileCount = 0; + relativePath = getRelativePath(directoryNo); } public int incrementAndGetCount() { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue Aug 19 23:49:39 2014 @@ -18,15 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import com.google.common.annotations.VisibleForTesting; - /** * Component tracking resources all of the same {@link LocalResourceVisibility} * @@ -34,18 +31,11 @@ import com.google.common.annotations.Vis interface LocalResourcesTracker extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> { - // TODO: Not used at all!! - boolean contains(LocalResourceRequest resource); - boolean remove(LocalizedResource req, DeletionService delService); Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); String getUser(); - long nextUniqueNumber(); - - @VisibleForTesting - @Private LocalizedResource getLocalizedResource(LocalResourceRequest request); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.io.File; +import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -27,14 +28,21 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import com.google.common.annotations.VisibleForTesting; @@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen .compile(RANDOM_DIR_REGEX); private final String user; + private final ApplicationId appId; private final Dispatcher dispatcher; private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc; private Configuration conf; @@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen * per APPLICATION, USER and PUBLIC cache. */ private AtomicLong uniqueNumberGenerator = new AtomicLong(9); + private NMStateStoreService stateStore; - public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, - boolean useLocalCacheDirectoryManager, Configuration conf) { - this(user, dispatcher, + public LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, + Configuration conf, NMStateStoreService stateStore) { + this(user, appId, dispatcher, new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(), - useLocalCacheDirectoryManager, conf); + useLocalCacheDirectoryManager, conf, stateStore); } - LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc, - boolean useLocalCacheDirectoryManager, Configuration conf) { + boolean useLocalCacheDirectoryManager, Configuration conf, + NMStateStoreService stateStore) { + this.appId = appId; this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; @@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen new ConcurrentHashMap<LocalResourceRequest, Path>(); } this.conf = conf; + this.stateStore = stateStore; } /* @@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen if (rsrc != null && (!isResourcePresent(rsrc))) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); - localrsrc.remove(req); - decrementFileCountForLocalCacheDirectory(req, rsrc); + removeResource(req); rsrc = null; } if (null == rsrc) { @@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen } break; case LOCALIZATION_FAILED: - decrementFileCountForLocalCacheDirectory(req, null); /* * If resource localization fails then Localized resource will be * removed from local cache. */ - localrsrc.remove(req); + removeResource(req); + break; + case RECOVERED: + if (rsrc != null) { + LOG.warn("Ignoring attempt to recover existing resource " + rsrc); + return; + } + rsrc = recoverResource(req, (ResourceRecoveredEvent) event); + localrsrc.put(req, rsrc); break; } + rsrc.handle(event); + + if (event.getType() == ResourceEventType.LOCALIZED) { + if (rsrc.getLocalPath() != null) { + try { + stateStore.finishResourceLocalization(user, appId, + buildLocalizedResourceProto(rsrc)); + } catch (IOException ioe) { + LOG.error("Error storing resource state for " + rsrc, ioe); + } + } else { + LOG.warn("Resource " + rsrc + " localized without a location"); + } + } + } + + private LocalizedResource recoverResource(LocalResourceRequest req, + ResourceRecoveredEvent event) { + // unique number for a resource is the directory of the resource + Path localDir = event.getLocalPath().getParent(); + long rsrcId = Long.parseLong(localDir.getName()); + + // update ID generator to avoid conflicts with existing resources + while (true) { + long currentRsrcId = uniqueNumberGenerator.get(); + long nextRsrcId = Math.max(currentRsrcId, rsrcId); + if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) { + break; + } + } + + incrementFileCountForLocalCacheDirectory(localDir.getParent()); + + return new LocalizedResource(req, dispatcher); + } + + private LocalizedResourceProto buildLocalizedResourceProto( + LocalizedResource rsrc) { + return LocalizedResourceProto.newBuilder() + .setResource(buildLocalResourceProto(rsrc.getRequest())) + .setLocalPath(rsrc.getLocalPath().toString()) + .setSize(rsrc.getSize()) + .build(); + } + + private LocalResourceProto buildLocalResourceProto(LocalResource lr) { + LocalResourcePBImpl lrpb; + if (!(lr instanceof LocalResourcePBImpl)) { + lr = LocalResource.newInstance(lr.getResource(), lr.getType(), + lr.getVisibility(), lr.getSize(), lr.getTimestamp(), + lr.getPattern()); + } + lrpb = (LocalResourcePBImpl) lr; + return lrpb.getProto(); + } + + public void incrementFileCountForLocalCacheDirectory(Path cacheDir) { + if (useLocalCacheDirectoryManager) { + Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot( + cacheDir); + if (cacheRoot != null) { + LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot); + if (dir == null) { + dir = new LocalCacheDirectoryManager(conf); + LocalCacheDirectoryManager otherDir = + directoryManagers.putIfAbsent(cacheRoot, dir); + if (otherDir != null) { + dir = otherDir; + } + } + if (cacheDir.equals(cacheRoot)) { + dir.incrementFileCountForPath(""); + } else { + String dirStr = cacheDir.toUri().getRawPath(); + String rootStr = cacheRoot.toUri().getRawPath(); + dir.incrementFileCountForPath( + dirStr.substring(rootStr.length() + 1)); + } + } + } } /* @@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen } @Override - public boolean contains(LocalResourceRequest resource) { - return localrsrc.containsKey(resource); - } - - @Override public boolean remove(LocalizedResource rem, DeletionService delService) { // current synchronization guaranteed by crude RLS event for cleanup LocalizedResource rsrc = localrsrc.get(rem.getRequest()); @@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen + " with non-zero refcount"); return false; } else { // ResourceState is LOCALIZED or INIT - localrsrc.remove(rem.getRequest()); if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } - decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc); + removeResource(rem.getRequest()); LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache"); return true; } } + private void removeResource(LocalResourceRequest req) { + LocalizedResource rsrc = localrsrc.remove(req); + decrementFileCountForLocalCacheDirectory(req, rsrc); + if (rsrc != null) { + Path localPath = rsrc.getLocalPath(); + if (localPath != null) { + try { + stateStore.removeLocalizedResource(user, appId, localPath); + } catch (IOException e) { + LOG.error("Unable to remove resource " + rsrc + " from state store", + e); + } + } + } + } + /** * Returns the path up to the random directory component. */ @@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen @Override public Path getPathForLocalization(LocalResourceRequest req, Path localDirPath) { + Path rPath = localDirPath; if (useLocalCacheDirectoryManager && localDirPath != null) { if (!directoryManagers.containsKey(localDirPath)) { @@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen } LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); - Path rPath = localDirPath; + rPath = localDirPath; String hierarchicalPath = dir.getRelativePathForLocalization(); // For most of the scenarios we will get root path only which // is an empty string @@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen rPath = new Path(localDirPath, hierarchicalPath); } inProgressLocalResourcesMap.put(req, rPath); - return rPath; - } else { - return localDirPath; } - } - @Override - public long nextUniqueNumber() { - return uniqueNumberGenerator.incrementAndGet(); + rPath = new Path(rPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + Path localPath = new Path(rPath, req.getPath().getName()); + LocalizedResource rsrc = localrsrc.get(req); + rsrc.setLocalPath(localPath); + LocalResource lr = LocalResource.newInstance(req.getResource(), + req.getType(), req.getVisibility(), req.getSize(), + req.getTimestamp()); + try { + stateStore.startResourceLocalization(user, appId, + ((LocalResourcePBImpl) lr).getProto(), localPath); + } catch (IOException e) { + LOG.error("Unable to record localization start for " + rsrc, e); + } + return rPath; } - @VisibleForTesting - @Private @Override public LocalizedResource getLocalizedResource(LocalResourceRequest request) { return localrsrc.get(request); } -} \ No newline at end of file + + @VisibleForTesting + LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) { + LocalCacheDirectoryManager mgr = null; + if (useLocalCacheDirectoryManager) { + mgr = directoryManagers.get(localDirPath); + } + return mgr; + } +} Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue Aug 19 23:49:39 2014 @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -54,8 +55,8 @@ public class LocalizedResource implement private static final Log LOG = LogFactory.getLog(LocalizedResource.class); - Path localPath; - long size = -1; + volatile Path localPath; + volatile long size = -1; final LocalResourceRequest rsrc; final Dispatcher dispatcher; final StateMachine<ResourceState,ResourceEventType,ResourceEvent> @@ -76,6 +77,8 @@ public class LocalizedResource implement // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) + .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, + ResourceEventType.RECOVERED, new RecoveredTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, @@ -157,6 +160,10 @@ public class LocalizedResource implement return localPath; } + public void setLocalPath(Path localPath) { + this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath); + } + public long getTimestamp() { return timestamp.get(); } @@ -234,7 +241,8 @@ public class LocalizedResource implement @Override public void transition(LocalizedResource rsrc, ResourceEvent event) { ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event; - rsrc.localPath = locEvent.getLocation(); + rsrc.localPath = + Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation()); rsrc.size = locEvent.getSize(); for (ContainerId container : rsrc.ref) { rsrc.dispatcher.getEventHandler().handle( @@ -291,4 +299,13 @@ public class LocalizedResource implement rsrc.release(relEvent.getContainer()); } } + + private static class RecoveredTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event; + rsrc.localPath = recoveredEvent.getLocalPath(); + rsrc.size = recoveredEvent.getSize(); + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Aug 19 23:49:39 2014 @@ -74,13 +74,17 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; @@ -109,10 +113,15 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -142,6 +151,7 @@ public class ResourceLocalizationService private RecordFactory recordFactory; private final ScheduledExecutorService cacheCleanup; private LocalizerTokenSecretManager secretManager; + private NMStateStoreService stateStore; private LocalResourcesTracker publicRsrc; @@ -163,7 +173,7 @@ public class ResourceLocalizationService public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService, - LocalDirsHandlerService dirsHandler) { + LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) { super(ResourceLocalizationService.class.getName()); this.exec = exec; @@ -175,6 +185,7 @@ public class ResourceLocalizationService new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") .build()); + this.stateStore = stateStore; } FileContext getLocalFileContext(Configuration conf) { @@ -203,15 +214,17 @@ public class ResourceLocalizationService @Override public void serviceInit(Configuration conf) throws Exception { this.validateConf(conf); - this.publicRsrc = - new LocalResourcesTrackerImpl(null, dispatcher, true, conf); + this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher, + true, conf, stateStore); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { FileContext lfs = getLocalFileContext(conf); lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); - cleanUpLocalDir(lfs,delService); + if (!stateStore.canRecover()) { + cleanUpLocalDir(lfs,delService); + } List<String> localDirs = dirsHandler.getLocalDirs(); for (String localDir : localDirs) { @@ -239,6 +252,7 @@ public class ResourceLocalizationService cacheCleanupPeriod = conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS); localizationServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); @@ -249,6 +263,74 @@ public class ResourceLocalizationService super.serviceInit(conf); } + //Recover localized resources after an NM restart + public void recoverLocalizedResources(RecoveredLocalizationState state) + throws URISyntaxException { + LocalResourceTrackerState trackerState = state.getPublicTrackerState(); + recoverTrackerResources(publicRsrc, trackerState); + + for (Map.Entry<String, RecoveredUserResources> userEntry : + state.getUserResources().entrySet()) { + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); + if (!trackerState.isEmpty()) { + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + null, dispatcher, true, super.getConfig(), stateStore); + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + + for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = ConverterUtils.toString(appId); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } + } + } + + private void recoverTrackerResources(LocalResourcesTracker tracker, + LocalResourceTrackerState state) throws URISyntaxException { + for (LocalizedResourceProto proto : state.getLocalizedResources()) { + LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + LOG.info("Recovering localized resource " + req + " at " + + proto.getLocalPath()); + tracker.handle(new ResourceRecoveredEvent(req, + new Path(proto.getLocalPath()), proto.getSize())); + } + + for (Map.Entry<LocalResourceProto, Path> entry : + state.getInProgressResources().entrySet()) { + LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + Path localPath = entry.getValue(); + tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); + + // delete any in-progress localizations, containers will request again + LOG.info("Deleting in-progress localization for " + req + " at " + + localPath); + tracker.remove(tracker.getLocalizedResource(req), delService); + } + + // TODO: remove untracked directories in local filesystem + } + @Override public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { return localizerTracker.processHeartbeat(status); @@ -261,7 +343,9 @@ public class ResourceLocalizationService server = createServer(); server.start(); localizationServerAddress = - getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS, + getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); @@ -337,17 +421,10 @@ public class ResourceLocalizationService // 0) Create application tracking structs String userName = app.getUser(); privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, - dispatcher, true, super.getConfig())); - if (null != appRsrc.putIfAbsent( - ConverterUtils.toString(app.getAppId()), - new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super - .getConfig()))) { - LOG.warn("Initializing application " + app + " already present"); - assert false; // TODO: FIXME assert doesn't help - // ^ The condition is benign. Tests should fail and it - // should appear in logs, but it's an internal error - // that should have no effect on applications - } + null, dispatcher, true, super.getConfig(), stateStore)); + String appIdStr = ConverterUtils.toString(app.getAppId()); + appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(), + app.getAppId(), dispatcher, false, super.getConfig(), stateStore)); // 1) Signal container init // // This is handled by the ApplicationImpl state machine and allows @@ -446,18 +523,28 @@ public class ResourceLocalizationService @SuppressWarnings({"unchecked"}) private void handleDestroyApplicationResources(Application application) { - String userName; - String appIDStr; + String userName = application.getUser(); + ApplicationId appId = application.getAppId(); + String appIDStr = application.toString(); LocalResourcesTracker appLocalRsrcsTracker = - appRsrc.remove(ConverterUtils.toString(application.getAppId())); - if (null == appLocalRsrcsTracker) { + appRsrc.remove(ConverterUtils.toString(appId)); + if (appLocalRsrcsTracker != null) { + for (LocalizedResource rsrc : appLocalRsrcsTracker ) { + Path localPath = rsrc.getLocalPath(); + if (localPath != null) { + try { + stateStore.removeLocalizedResource(userName, appId, localPath); + } catch (IOException e) { + LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr + + " from state store", e); + } + } + } + } else { LOG.warn("Removing uninitialized application " + application); } - // TODO: What to do with appLocalRsrcsTracker? // Delete the application directories - userName = application.getUser(); - appIDStr = application.toString(); for (String localDir : dirsHandler.getLocalDirs()) { // Delete the user-owned app-dir @@ -668,19 +755,15 @@ public class ResourceLocalizationService if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = + Path publicRootPath = dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); - Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); - if (!hierarchicalPath.equals(publicDirDestPath)) { - publicDirDestPath = hierarchicalPath; + Path publicDirDestPath = + publicRsrc.getPathForLocalization(key, publicRootPath); + if (!publicDirDestPath.getParent().equals(publicRootPath)) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - publicDirDestPath = - new Path(publicDirDestPath, Long.toString(publicRsrc - .nextUniqueNumber())); // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { @@ -968,9 +1051,8 @@ public class ResourceLocalizationService Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); - dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc), - dirPath); - return new Path (dirPath, Long.toString(tracker.nextUniqueNumber())); + return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), + dirPath); } @Override Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Tue Aug 19 23:49:39 2014 @@ -31,5 +31,7 @@ public enum ResourceEventType { /** See {@link ResourceReleaseEvent} */ RELEASE, /** See {@link ResourceFailedLocalizationEvent} */ - LOCALIZATION_FAILED + LOCALIZATION_FAILED, + /** See {@link ResourceRecoveredEvent} */ + RECOVERED } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Tue Aug 19 23:49:39 2014 @@ -25,5 +25,7 @@ public interface AppLogAggregator extend void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful); + void abortLogAggregation(); + void finishLogAggregation(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue Aug 19 23:49:39 2014 @@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem private final BlockingQueue<ContainerId> pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); + private final AtomicBoolean aborted = new AtomicBoolean(); private final Map<ApplicationAccessType, String> appAcls; private LogWriter writer = null; @@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem private void doAppLogAggregation() { ContainerId containerId; - while (!this.appFinishing.get()) { + while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { wait(THREAD_SLEEP_TIME); @@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem } } + if (this.aborted.get()) { + return; + } + // Application is finished. Finish pending-containers while ((containerId = this.pendingContainers.poll()) != null) { uploadLogsForContainer(containerId); @@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem this.appFinishing.set(true); this.notifyAll(); } + + @Override + public synchronized void abortLogAggregation() { + LOG.info("Aborting log aggregation for " + this.applicationId); + this.aborted.set(true); + this.notifyAll(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Aug 19 23:49:39 2014 @@ -142,9 +142,17 @@ public class LogAggregationService exten private void stopAggregators() { threadPool.shutdown(); + // if recovery on restart is supported then leave outstanding aggregations + // to the next restart + boolean shouldAbort = context.getNMStateStore().canRecover() + && !context.getDecommissioned(); // politely ask to finish for (AppLogAggregator aggregator : appLogAggregators.values()) { - aggregator.finishLogAggregation(); + if (shouldAbort) { + aggregator.abortLogAggregation(); + } else { + aggregator.finishLogAggregation(); + } } while (!threadPool.isTerminated()) { // wait for all threads to finish for (ApplicationId appId : appLogAggregators.keySet()) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Aug 19 23:49:39 2014 @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -69,6 +70,8 @@ public class ContainersMonitorImpl exten private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; + private long maxVCoresAllottedForContainers; + private static final long UNKNOWN_MEMORY_LIMIT = -1L; public ContainersMonitorImpl(ContainerExecutor exec, @@ -107,10 +110,16 @@ public class ContainersMonitorImpl exten YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l; + long configuredVCoresForContainers = conf.getLong( + YarnConfiguration.NM_VCORES, + YarnConfiguration.DEFAULT_NM_VCORES); + + // Setting these irrespective of whether checks are enabled. Required in // the UI. // ///////// Physical memory configuration ////// this.maxPmemAllottedForContainers = configuredPMemForContainers; + this.maxVCoresAllottedForContainers = configuredVCoresForContainers; // ///////// Virtual memory configuration ////// float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, @@ -403,6 +412,7 @@ public class ContainersMonitorImpl exten boolean isMemoryOverLimit = false; String msg = ""; + int containerExitStatus = ContainerExitStatus.INVALID; if (isVmemCheckEnabled() && isProcessTreeOverLimit(containerId.toString(), currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { @@ -414,6 +424,7 @@ public class ContainersMonitorImpl exten currentPmemUsage, pmemLimit, pId, containerId, pTree); isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM; } else if (isPmemCheckEnabled() && isProcessTreeOverLimit(containerId.toString(), currentPmemUsage, curRssMemUsageOfAgedProcesses, @@ -426,6 +437,7 @@ public class ContainersMonitorImpl exten currentPmemUsage, pmemLimit, pId, containerId, pTree); isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM; } if (isMemoryOverLimit) { @@ -440,7 +452,8 @@ public class ContainersMonitorImpl exten } // kill the container eventDispatcher.getEventHandler().handle( - new ContainerKillEvent(containerId, msg)); + new ContainerKillEvent(containerId, + containerExitStatus, msg)); it.remove(); LOG.info("Removed ProcessTree with root " + pId); } else { @@ -513,6 +526,11 @@ public class ContainersMonitorImpl exten return this.maxPmemAllottedForContainers; } + @Override + public long getVCoresAllocatedForContainers() { + return this.maxVCoresAllottedForContainers; + } + /** * Is the total virtual memory check enabled? * Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java Tue Aug 19 23:49:39 2014 @@ -40,6 +40,9 @@ public class NodeManagerMetrics { @Metric("Current # of allocated containers") MutableGaugeInt allocatedContainers; @Metric MutableGaugeInt availableGB; + @Metric("Current allocated Virtual Cores") + MutableGaugeInt allocatedVCores; + @Metric MutableGaugeInt availableVCores; public static NodeManagerMetrics create() { return create(DefaultMetricsSystem.instance()); @@ -88,16 +91,21 @@ public class NodeManagerMetrics { allocatedContainers.incr(); allocatedGB.incr(res.getMemory() / 1024); availableGB.decr(res.getMemory() / 1024); + allocatedVCores.incr(res.getVirtualCores()); + availableVCores.decr(res.getVirtualCores()); } public void releaseContainer(Resource res) { allocatedContainers.decr(); allocatedGB.decr(res.getMemory() / 1024); availableGB.incr(res.getMemory() / 1024); + allocatedVCores.decr(res.getVirtualCores()); + availableVCores.incr(res.getVirtualCores()); } public void addResource(Resource res) { availableGB.incr(res.getMemory() / 1024); + availableVCores.incr(res.getVirtualCores()); } public int getRunningContainers() { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.security; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -49,14 +53,74 @@ public class NMContainerTokenSecretManag private MasterKeyData previousMasterKey; private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker; - + private final NMStateStoreService stateStore; private String nodeHostAddr; public NMContainerTokenSecretManager(Configuration conf) { + this(conf, new NMNullStateStoreService()); + } + + public NMContainerTokenSecretManager(Configuration conf, + NMStateStoreService stateStore) { super(conf); recentlyStartedContainerTracker = new TreeMap<Long, List<ContainerId>>(); + this.stateStore = stateStore; + } + + public synchronized void recover() + throws IOException { + RecoveredContainerTokensState state = + stateStore.loadContainerTokensState(); + MasterKey key = state.getCurrentMasterKey(); + if (key != null) { + super.currentMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + key = state.getPreviousMasterKey(); + if (key != null) { + previousMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + // restore the serial number from the current master key + if (super.currentMasterKey != null) { + super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; + } + + for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) { + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List<ContainerId> containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList<ContainerId>(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } + } + } + + private void updateCurrentMasterKey(MasterKeyData key) { + super.currentMasterKey = key; + try { + stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } + + private void updatePreviousMasterKey(MasterKeyData key) { + previousMasterKey = key; + try { + stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update previous master key in state store", e); + } } /** @@ -68,21 +132,16 @@ public class NMContainerTokenSecretManag */ @Private public synchronized void setMasterKey(MasterKey masterKeyRecord) { - LOG.info("Rolling master-key for container-tokens, got key with id " - + masterKeyRecord.getKeyId()); - if (super.currentMasterKey == null) { - super.currentMasterKey = - new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord - .getBytes().array())); - } else { - if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord - .getKeyId()) { - // Update keys only if the key has changed. - this.previousMasterKey = super.currentMasterKey; - super.currentMasterKey = - new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord - .getBytes().array())); + // Update keys only if the key has changed. + if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey() + .getKeyId() != masterKeyRecord.getKeyId()) { + LOG.info("Rolling master-key for container-tokens, got key with id " + + masterKeyRecord.getKeyId()); + if (super.currentMasterKey != null) { + updatePreviousMasterKey(super.currentMasterKey); } + updateCurrentMasterKey(new MasterKeyData(masterKeyRecord, + createSecretKey(masterKeyRecord.getBytes().array()))); } } @@ -137,14 +196,19 @@ public class NMContainerTokenSecretManag removeAnyContainerTokenIfExpired(); + ContainerId containerId = tokenId.getContainerID(); Long expTime = tokenId.getExpiryTimeStamp(); // We might have multiple containers with same expiration time. if (!recentlyStartedContainerTracker.containsKey(expTime)) { recentlyStartedContainerTracker .put(expTime, new ArrayList<ContainerId>()); } - recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID()); - + recentlyStartedContainerTracker.get(expTime).add(containerId); + try { + stateStore.storeContainerToken(containerId, expTime); + } catch (IOException e) { + LOG.error("Unable to store token for container " + containerId, e); + } } protected synchronized void removeAnyContainerTokenIfExpired() { @@ -155,6 +219,13 @@ public class NMContainerTokenSecretManag while (containersI.hasNext()) { Entry<Long, List<ContainerId>> containerEntry = containersI.next(); if (containerEntry.getKey() < currTime) { + for (ContainerId container : containerEntry.getValue()) { + try { + stateStore.removeContainerToken(container); + } catch (IOException e) { + LOG.error("Unable to remove token for container " + container, e); + } + } containersI.remove(); } else { break; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.security; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -45,16 +49,79 @@ public class NMTokenSecretManagerInNM ex private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys; private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap; + private final NMStateStoreService stateStore; private NodeId nodeId; - public NMTokenSecretManagerInNM() { + this(new NMNullStateStoreService()); + } + + public NMTokenSecretManagerInNM(NMStateStoreService stateStore) { this.oldMasterKeys = new HashMap<ApplicationAttemptId, MasterKeyData>(); appToAppAttemptMap = new HashMap<ApplicationId, List<ApplicationAttemptId>>(); + this.stateStore = stateStore; } + public synchronized void recover() + throws IOException { + RecoveredNMTokensState state = stateStore.loadNMTokensState(); + MasterKey key = state.getCurrentMasterKey(); + if (key != null) { + super.currentMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + key = state.getPreviousMasterKey(); + if (key != null) { + previousMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + // restore the serial number from the current master key + if (super.currentMasterKey != null) { + super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; + } + + for (Map.Entry<ApplicationAttemptId, MasterKey> entry : + state.getApplicationMasterKeys().entrySet()) { + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } + + // reconstruct app to app attempts map + appToAppAttemptMap.clear(); + for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) { + ApplicationId app = attempt.getApplicationId(); + List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app); + if (attempts == null) { + attempts = new ArrayList<ApplicationAttemptId>(); + appToAppAttemptMap.put(app, attempts); + } + attempts.add(attempt); + } + } + + private void updateCurrentMasterKey(MasterKeyData key) { + super.currentMasterKey = key; + try { + stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } + + private void updatePreviousMasterKey(MasterKeyData key) { + previousMasterKey = key; + try { + stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update previous master key in state store", e); + } + } + /** * Used by NodeManagers to create a token-secret-manager with the key * obtained from the RM. This can happen during registration or when the RM @@ -62,20 +129,16 @@ public class NMTokenSecretManagerInNM ex */ @Private public synchronized void setMasterKey(MasterKey masterKey) { - LOG.info("Rolling master-key for nm-tokens, got key with id :" - + masterKey.getKeyId()); - if (super.currentMasterKey == null) { - super.currentMasterKey = - new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes() - .array())); - } else { - if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey - .getKeyId()) { - this.previousMasterKey = super.currentMasterKey; - super.currentMasterKey = - new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes() - .array())); + // Update keys only if the key has changed. + if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey() + .getKeyId() != masterKey.getKeyId()) { + LOG.info("Rolling master-key for container-tokens, got key with id " + + masterKey.getKeyId()); + if (super.currentMasterKey != null) { + updatePreviousMasterKey(super.currentMasterKey); } + updateCurrentMasterKey(new MasterKeyData(masterKey, + createSecretKey(masterKey.getBytes().array()))); } } @@ -128,7 +191,7 @@ public class NMTokenSecretManagerInNM ex LOG.debug("Removing application attempts NMToken keys for application " + appId); for (ApplicationAttemptId appAttemptId : appAttemptList) { - this.oldMasterKeys.remove(appAttemptId); + removeAppAttemptKey(appAttemptId); } appToAppAttemptMap.remove(appId); } else { @@ -164,11 +227,11 @@ public class NMTokenSecretManagerInNM ex + identifier.getApplicationAttemptId().toString()); if (identifier.getKeyId() == currentMasterKey.getMasterKey() .getKeyId()) { - oldMasterKeys.put(appAttemptId, currentMasterKey); + updateAppAttemptKey(appAttemptId, currentMasterKey); } else if (previousMasterKey != null && identifier.getKeyId() == previousMasterKey.getMasterKey() .getKeyId()) { - oldMasterKeys.put(appAttemptId, previousMasterKey); + updateAppAttemptKey(appAttemptId, previousMasterKey); } else { throw new InvalidToken( "Older NMToken should not be used while starting the container."); @@ -193,4 +256,24 @@ public class NMTokenSecretManagerInNM ex public synchronized NodeId getNodeId() { return this.nodeId; } + + private void updateAppAttemptKey(ApplicationAttemptId attempt, + MasterKeyData key) { + this.oldMasterKeys.put(attempt, key); + try { + stateStore.storeNMTokenApplicationMasterKey(attempt, + key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to store master key for application " + attempt, e); + } + } + + private void removeAppAttemptKey(ApplicationAttemptId attempt) { + this.oldMasterKeys.remove(attempt); + try { + stateStore.removeNMTokenApplicationMasterKey(attempt); + } catch (IOException e) { + LOG.error("Unable to remove master key for application " + attempt, e); + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java Tue Aug 19 23:49:39 2014 @@ -85,6 +85,7 @@ public class ContainerPage extends NMVie ._("Diagnostics", info.getDiagnostics()) ._("User", info.getUser()) ._("TotalMemoryNeeded", info.getMemoryNeeded()) + ._("TotalVCoresNeeded", info.getVCoresNeeded()) ._("logs", info.getShortLogLink(), "Link to logs"); html._(InfoBlock.class); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java Tue Aug 19 23:49:39 2014 @@ -72,7 +72,9 @@ public class NodePage extends NMView { ._("Total Pmem allocated for Container", StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB)) ._("Pmem enforcement enabled", - info.isVmemCheckEnabled()) + info.isPmemCheckEnabled()) + ._("Total VCores allocated for Containers", + String.valueOf(info.getTotalVCoresAllocated())) ._("NodeHealthyStatus", info.getHealthStatus()) ._("LastNodeHealthTime", new Date( Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Tue Aug 19 23:49:39 2014 @@ -55,7 +55,9 @@ public class WebServer extends AbstractS @Override protected void serviceStart() throws Exception { - String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()); + String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(), + YarnConfiguration.NM_BIND_HOST, + WebAppUtils.getNMWebAppURLWithoutScheme(getConfig())); LOG.info("Instantiating NMWebApp at " + bindAddress); try { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java Tue Aug 19 23:49:39 2014 @@ -42,6 +42,7 @@ public class ContainerInfo { protected String diagnostics; protected String user; protected long totalMemoryNeededMB; + protected long totalVCoresNeeded; protected String containerLogsLink; protected String nodeId; @XmlTransient @@ -76,6 +77,7 @@ public class ContainerInfo { Resource res = container.getResource(); if (res != null) { this.totalMemoryNeededMB = res.getMemory(); + this.totalVCoresNeeded = res.getVirtualCores(); } this.containerLogsShortLink = ujoin("containerlogs", this.id, container.getUser()); @@ -130,4 +132,8 @@ public class ContainerInfo { return this.totalMemoryNeededMB; } + public long getVCoresNeeded() { + return this.totalVCoresNeeded; + } + }
