Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Tue Aug 12 11:02:38 2014 @@ -29,8 +29,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -59,6 +61,40 @@ public abstract class NMStateStoreServic } } + public enum RecoveredContainerStatus { + REQUESTED, + LAUNCHED, + COMPLETED + } + + public static class RecoveredContainerState { + RecoveredContainerStatus status; + int exitCode = ContainerExitStatus.INVALID; + boolean killed = false; + String diagnostics = ""; + StartContainerRequest startRequest; + + public RecoveredContainerStatus getStatus() { + return status; + } + + public int getExitCode() { + return exitCode; + } + + public boolean getKilled() { + return killed; + } + + public String getDiagnostics() { + return diagnostics; + } + + public StartContainerRequest getStartRequest() { + return startRequest; + } + } + public static class LocalResourceTrackerState { List<LocalizedResourceProto> localizedResources = new ArrayList<LocalizedResourceProto>(); @@ -176,20 +212,101 @@ public abstract class NMStateStoreServic } + /** + * Load the state of applications + * @return recovered state for applications + * @throws IOException + */ public abstract RecoveredApplicationsState loadApplicationsState() throws IOException; + /** + * Record the start of an application + * @param appId the application ID + * @param p state to store for the application + * @throws IOException + */ public abstract void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException; + /** + * Record that an application has finished + * @param appId the application ID + * @throws IOException + */ public abstract void storeFinishedApplication(ApplicationId appId) throws IOException; + /** + * Remove records corresponding to an application + * @param appId the application ID + * @throws IOException + */ public abstract void removeApplication(ApplicationId appId) throws IOException; /** + * Load the state of containers + * @return recovered state for containers + * @throws IOException + */ + public abstract List<RecoveredContainerState> loadContainersState() + throws IOException; + + /** + * Record a container start request + * @param containerId the container ID + * @param startRequest the container start request + * @throws IOException + */ + public abstract void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException; + + /** + * Record that a container has been launched + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerLaunched(ContainerId containerId) + throws IOException; + + /** + * Record that a container has completed + * @param containerId the container ID + * @param exitCode the exit code from the container + * @throws IOException + */ + public abstract void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException; + + /** + * Record a request to kill a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerKilled(ContainerId containerId) + throws IOException; + + /** + * Record diagnostics for a container + * @param containerId the container ID + * @param diagnostics the container diagnostics + * @throws IOException + */ + public abstract void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException; + + /** + * Remove records corresponding to a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void removeContainer(ContainerId containerId) + throws IOException; + + + /** * Load the state of localized resources * @return recovered localized resource state * @throws IOException @@ -230,43 +347,111 @@ public abstract class NMStateStoreServic ApplicationId appId, Path localPath) throws IOException; + /** + * Load the state of the deletion service + * @return recovered deletion service state + * @throws IOException + */ public abstract RecoveredDeletionServiceState loadDeletionServiceState() throws IOException; + /** + * Record a deletion task + * @param taskId the deletion task ID + * @param taskProto the deletion task protobuf + * @throws IOException + */ public abstract void storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto) throws IOException; + /** + * Remove records corresponding to a deletion task + * @param taskId the deletion task ID + * @throws IOException + */ public abstract void removeDeletionTask(int taskId) throws IOException; + /** + * Load the state of NM tokens + * @return recovered state of NM tokens + * @throws IOException + */ public abstract RecoveredNMTokensState loadNMTokensState() throws IOException; + /** + * Record the current NM token master key + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous NM token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record a master key corresponding to an application + * @param attempt the application attempt ID + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException; + /** + * Remove a master key corresponding to an application + * @param attempt the application attempt ID + * @throws IOException + */ public abstract void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException; + /** + * Load the state of container tokens + * @return recovered state of container tokens + * @throws IOException + */ public abstract RecoveredContainerTokensState loadContainerTokensState() throws IOException; + /** + * Record the current container token master key + * @param key the master key + * @throws IOException + */ public abstract void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous container token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record the expiration time for a container token + * @param containerId the container ID + * @param expirationTime the container token expiration time + * @throws IOException + */ public abstract void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException; + /** + * Remove records for a container token + * @param containerId the container ID + * @throws IOException + */ public abstract void removeContainerToken(ContainerId containerId) throws IOException;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Tue Aug 12 11:02:38 2014 @@ -33,6 +33,7 @@ #include <limits.h> #include <sys/stat.h> #include <sys/mount.h> +#include <sys/wait.h> static const int DEFAULT_MIN_USERID = 1000; @@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const } /** + * Write the exit code of the container into the exit code file + * exit_code_file: Path to exit code file where exit code needs to be written + */ +static int write_exit_code_file(const char* exit_code_file, int exit_code) { + char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1, + exit_code_file); + if (tmp_ecode_file == NULL) { + return -1; + } + + // create with 700 + int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU); + if (ecode_fd == -1) { + fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file, + strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + char ecode_buf[21]; + snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code); + ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf)); + close(ecode_fd); + if (written == -1) { + fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n", + tmp_ecode_file, strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + // rename temp file to actual exit code file + // use rename as atomic + if (rename(tmp_ecode_file, exit_code_file)) { + fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n", + tmp_ecode_file, exit_code_file, strerror(errno)); + unlink(tmp_ecode_file); + free(tmp_ecode_file); + return -1; + } + + free(tmp_ecode_file); + return 0; +} + +/** + * Wait for the container process to exit and write the exit code to + * the exit code file. + * Returns the exit code of the container process. + */ +static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) { + int child_status = -1; + int exit_code = -1; + int waitpid_result; + + if (change_effective_user(nm_uid, nm_gid) != 0) { + return -1; + } + do { + waitpid_result = waitpid(pid, &child_status, 0); + } while (waitpid_result == -1 && errno == EINTR); + if (waitpid_result < 0) { + fprintf(LOGFILE, "Error waiting for container process %d - %s\n", + pid, strerror(errno)); + return -1; + } + if (WIFEXITED(child_status)) { + exit_code = WEXITSTATUS(child_status); + } else if (WIFSIGNALED(child_status)) { + exit_code = 0x80 + WTERMSIG(child_status); + } else { + fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid); + } + if (write_exit_code_file(exit_code_file, exit_code) < 0) { + return -1; + } + return exit_code; +} + +/** * Change the real and effective user and group to abandon the super user * priviledges. */ @@ -337,6 +417,10 @@ char *get_container_work_directory(const nm_root, user, app_id, container_id); } +char *get_exit_code_file(const char* pid_file) { + return concatenate("%s.exitcode", "exit_code_file", 1, pid_file); +} + char *get_container_launcher_file(const char* work_dir) { return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT); } @@ -879,6 +963,8 @@ int launch_container_as_user(const char int exit_code = -1; char *script_file_dest = NULL; char *cred_file_dest = NULL; + char *exit_code_file = NULL; + script_file_dest = get_container_launcher_file(work_dir); if (script_file_dest == NULL) { exit_code = OUT_OF_MEMORY; @@ -889,6 +975,11 @@ int launch_container_as_user(const char exit_code = OUT_OF_MEMORY; goto cleanup; } + exit_code_file = get_exit_code_file(pid_file); + if (NULL == exit_code_file) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } // open launch script int container_file_source = open_file_as_nm(script_name); @@ -902,6 +993,13 @@ int launch_container_as_user(const char goto cleanup; } + pid_t child_pid = fork(); + if (child_pid != 0) { + // parent + exit_code = wait_and_write_exit_code(child_pid, exit_code_file); + goto cleanup; + } + // setsid pid_t pid = setsid(); if (pid == -1) { @@ -986,6 +1084,7 @@ int launch_container_as_user(const char exit_code = 0; cleanup: + free(exit_code_file); free(script_file_dest); free(cred_file_dest); return exit_code; Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Aug 12 11:02:38 2014 @@ -201,6 +201,7 @@ public class TestNodeStatusUpdater { Dispatcher mockDispatcher = mock(Dispatcher.class); EventHandler mockEventHandler = mock(EventHandler.class); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); + NMStateStoreService stateStore = new NMNullStateStoreService(); nodeStatus.setResponseId(heartBeatID++); Map<ApplicationId, List<ContainerStatus>> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); @@ -226,9 +227,8 @@ public class TestNodeStatusUpdater { firstContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -257,9 +257,8 @@ public class TestNodeStatusUpdater { secondContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -784,7 +783,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - nodeStatusUpdater.updateStoppedContainersInCache(cId); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue Aug 12 11:02:38 2014 @@ -233,7 +233,7 @@ public abstract class BaseContainerManag protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path[] baseDirs) { + public void delete(String user, Path subDir, Path... baseDirs) { // Don't do any deletions. LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + ", baseDirs - " + baseDirs); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Tue Aug 12 11:02:38 2014 @@ -191,7 +191,8 @@ public class TestAuxServices { ContainerTokenIdentifier cti = new ContainerTokenIdentifier( ContainerId.newInstance(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); - Container container = new ContainerImpl(null, null, null, null, null, cti); + Container container = new ContainerImpl(null, null, null, null, null, + null, cti); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java Tue Aug 12 11:02:38 2014 @@ -80,6 +80,7 @@ public class TestContainerManagerRecover public void testApplicationRecovery() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Aug 12 11:02:38 2014 @@ -780,7 +780,8 @@ public class TestContainer { } when(ctxt.getServiceData()).thenReturn(serviceData); - c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), + ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, new EventHandler<ContainerEvent>() { @Override Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Tue Aug 12 11:02:38 2014 @@ -22,13 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api public class NMMemoryStateStoreService extends NMStateStoreService { private Map<ApplicationId, ContainerManagerApplicationProto> apps; private Set<ApplicationId> finishedApps; + private Map<ContainerId, RecoveredContainerState> containerStates; private Map<TrackerKey, TrackerState> trackerStates; private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; private RecoveredNMTokensState nmTokenState; @@ -53,6 +57,7 @@ public class NMMemoryStateStoreService e protected void initStorage(Configuration conf) { apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>(); finishedApps = new HashSet<ApplicationId>(); + containerStates = new HashMap<ContainerId, RecoveredContainerState>(); nmTokenState = new RecoveredNMTokensState(); nmTokenState.applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>(); @@ -100,6 +105,77 @@ public class NMMemoryStateStoreService e finishedApps.remove(appId); } + @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + // return a copy so caller can't modify our state + List<RecoveredContainerState> result = + new ArrayList<RecoveredContainerState>(containerStates.size()); + for (RecoveredContainerState rcs : containerStates.values()) { + RecoveredContainerState rcsCopy = new RecoveredContainerState(); + rcsCopy.status = rcs.status; + rcsCopy.exitCode = rcs.exitCode; + rcsCopy.killed = rcs.killed; + rcsCopy.diagnostics = rcs.diagnostics; + rcsCopy.startRequest = rcs.startRequest; + result.add(rcsCopy); + } + return new ArrayList<RecoveredContainerState>(); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.startRequest = startRequest; + containerStates.put(containerId, rcs); + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.diagnostics = diagnostics.toString(); + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + if (rcs.exitCode != ContainerExitStatus.INVALID) { + throw new IOException("Container already completed"); + } + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.killed = true; + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = exitCode; + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + containerStates.remove(containerId); + } + + private RecoveredContainerState getRecoveredContainerState( + ContainerId containerId) throws IOException { + RecoveredContainerState rcs = containerStates.get(containerId); + if (rcs == null) { + throw new IOException("No start request for " + containerId); + } + return rcs; + } private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Tue Aug 12 11:02:38 2014 @@ -25,18 +25,30 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.Yarn import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; @@ -193,6 +208,115 @@ public class TestNMLeveldbStateStoreServ } @Test + public void testContainerStorage() throws IOException { + // test empty when no state + List<RecoveredContainerState> recoveredContainers = + stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 5); + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put("rsrc", lrsrc); + Map<String, String> env = new HashMap<String, String>(); + env.put("somevar", "someval"); + List<String> containerCmds = new ArrayList<String>(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = + ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, serviceData, containerTokens, + acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = + new ContainerTokenIdentifier(containerId, "host", "user", + containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), + 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = + StartContainerRequest.newInstance(clc, containerToken); + + // store a container and verify recovered + stateStore.storeContainer(containerId, containerReq); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + + // launch the container, add some diagnostics, and verify recovered + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // mark the container killed, add some more diags, and verify recovered + diags.append("some more diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerKilled(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // add yet more diags, mark container completed, and verify recovered + diags.append("some final diags"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerCompleted(containerId, 21); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); + assertEquals(21, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // remove the container and verify not recovered + stateStore.removeContainer(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + } + + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Aug 12 11:02:38 2014 @@ -209,7 +209,7 @@ public class TestNMWebServer { BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, "password".getBytes(), currentTime); Container container = - new ContainerImpl(conf, dispatcher, launchContext, + new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, BuilderUtils.newContainerTokenIdentifier(containerToken)) { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 12 11:02:38 2014 @@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod private final RMContext context; private final String hostName; private final int commandPort; - private final int httpPort; + private int httpPort; private final String nodeAddress; // The containerManager address - private final String httpAddress; + private String httpAddress; private volatile ResourceOption resourceOption; private final Node node; @@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNod @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) - && rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); - } + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.resourceOption = newNode.getResourceOption(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Tue Aug 12 11:02:38 2014 @@ -153,14 +153,17 @@ public class SchedulerUtils { * @param rmNode RMNode with new resource view * @param clusterResource the cluster's resource that need to update * @param log Scheduler's log for resource change + * @return true if the resources have changed */ - public static void updateResourceIfChanged(SchedulerNode node, + public static boolean updateResourceIfChanged(SchedulerNode node, RMNode rmNode, Resource clusterResource, Log log) { + boolean result = false; Resource oldAvailableResource = node.getAvailableResource(); Resource newAvailableResource = Resources.subtract( rmNode.getTotalCapability(), node.getUsedResource()); if (!newAvailableResource.equals(oldAvailableResource)) { + result = true; Resource deltaResource = Resources.subtract(newAvailableResource, oldAvailableResource); // Reflect resource change to scheduler node. @@ -176,6 +179,8 @@ public class SchedulerUtils { + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " + deltaResource.getMemory() +"MB"); } + + return result; } /** Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 12 11:02:38 2014 @@ -783,7 +783,10 @@ public class CapacityScheduler extends FiCaSchedulerNode node = getNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); + if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, + LOG)) { + root.updateClusterResource(clusterResource); + } List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1617450&r1=1617449&r2=1617450&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Tue Aug 12 11:02:38 2014 @@ -595,7 +595,7 @@ public class TestResourceTrackerService // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); dispatcher.await(); - response = nm2.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
