Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Tue Aug 12 17:02:07 2014 @@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.proto.Yarn import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.records.Version; @@ -90,6 +93,14 @@ public class NMLeveldbStateStoreService private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/"; private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/"; + private static final String CONTAINERS_KEY_PREFIX = + "ContainerManager/containers/"; + private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; + private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; + private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; + private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; @@ -104,6 +115,8 @@ public class NMLeveldbStateStoreService private static final String CONTAINER_TOKENS_PREV_MASTER_KEY = CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private static final byte[] EMPTY_VALUE = new byte[0]; + private DB db; public NMLeveldbStateStoreService() { @@ -123,6 +136,160 @@ public class NMLeveldbStateStoreService @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + ArrayList<RecoveredContainerState> containers = + new ArrayList<RecoveredContainerState>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + + while (iter.hasNext()) { + Entry<byte[],byte[]> entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { + break; + } + + int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); + if (idEndPos < 0) { + throw new IOException("Unable to determine container in key: " + key); + } + ContainerId containerId = ConverterUtils.toContainerId( + key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); + String keyPrefix = key.substring(0, idEndPos+1); + containers.add(loadContainerState(containerId, iter, keyPrefix)); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + return containers; + } + + private RecoveredContainerState loadContainerState(ContainerId containerId, + LeveldbIterator iter, String keyPrefix) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.status = RecoveredContainerStatus.REQUESTED; + while (iter.hasNext()) { + Entry<byte[],byte[]> entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + iter.next(); + + String suffix = key.substring(keyPrefix.length()-1); // start with '/' + if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { + rcs.startRequest = new StartContainerRequestPBImpl( + StartContainerRequestProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { + rcs.diagnostics = asString(entry.getValue()); + } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + if (rcs.status == RecoveredContainerStatus.REQUESTED) { + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { + rcs.killed = true; + } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + } else { + throw new IOException("Unexpected container state key: " + key); + } + } + return rcs; + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REQUEST_KEY_SUFFIX; + try { + db.put(bytes(key), + ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_DIAGS_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(diagnostics.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_LAUNCHED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_KILLED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_EXIT_CODE_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(exitCode))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeContainer(ContainerId containerId) + throws IOException { + String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + + @Override public RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState();
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Tue Aug 12 17:02:07 2014 @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -63,6 +65,42 @@ public class NMNullStateStoreService ext } @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + } + + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { throw new UnsupportedOperationException( Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Tue Aug 12 17:02:07 2014 @@ -29,8 +29,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -59,6 +61,40 @@ public abstract class NMStateStoreServic } } + public enum RecoveredContainerStatus { + REQUESTED, + LAUNCHED, + COMPLETED + } + + public static class RecoveredContainerState { + RecoveredContainerStatus status; + int exitCode = ContainerExitStatus.INVALID; + boolean killed = false; + String diagnostics = ""; + StartContainerRequest startRequest; + + public RecoveredContainerStatus getStatus() { + return status; + } + + public int getExitCode() { + return exitCode; + } + + public boolean getKilled() { + return killed; + } + + public String getDiagnostics() { + return diagnostics; + } + + public StartContainerRequest getStartRequest() { + return startRequest; + } + } + public static class LocalResourceTrackerState { List<LocalizedResourceProto> localizedResources = new ArrayList<LocalizedResourceProto>(); @@ -176,20 +212,101 @@ public abstract class NMStateStoreServic } + /** + * Load the state of applications + * @return recovered state for applications + * @throws IOException + */ public abstract RecoveredApplicationsState loadApplicationsState() throws IOException; + /** + * Record the start of an application + * @param appId the application ID + * @param p state to store for the application + * @throws IOException + */ public abstract void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException; + /** + * Record that an application has finished + * @param appId the application ID + * @throws IOException + */ public abstract void storeFinishedApplication(ApplicationId appId) throws IOException; + /** + * Remove records corresponding to an application + * @param appId the application ID + * @throws IOException + */ public abstract void removeApplication(ApplicationId appId) throws IOException; /** + * Load the state of containers + * @return recovered state for containers + * @throws IOException + */ + public abstract List<RecoveredContainerState> loadContainersState() + throws IOException; + + /** + * Record a container start request + * @param containerId the container ID + * @param startRequest the container start request + * @throws IOException + */ + public abstract void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException; + + /** + * Record that a container has been launched + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerLaunched(ContainerId containerId) + throws IOException; + + /** + * Record that a container has completed + * @param containerId the container ID + * @param exitCode the exit code from the container + * @throws IOException + */ + public abstract void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException; + + /** + * Record a request to kill a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerKilled(ContainerId containerId) + throws IOException; + + /** + * Record diagnostics for a container + * @param containerId the container ID + * @param diagnostics the container diagnostics + * @throws IOException + */ + public abstract void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException; + + /** + * Remove records corresponding to a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void removeContainer(ContainerId containerId) + throws IOException; + + + /** * Load the state of localized resources * @return recovered localized resource state * @throws IOException @@ -230,43 +347,111 @@ public abstract class NMStateStoreServic ApplicationId appId, Path localPath) throws IOException; + /** + * Load the state of the deletion service + * @return recovered deletion service state + * @throws IOException + */ public abstract RecoveredDeletionServiceState loadDeletionServiceState() throws IOException; + /** + * Record a deletion task + * @param taskId the deletion task ID + * @param taskProto the deletion task protobuf + * @throws IOException + */ public abstract void storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto) throws IOException; + /** + * Remove records corresponding to a deletion task + * @param taskId the deletion task ID + * @throws IOException + */ public abstract void removeDeletionTask(int taskId) throws IOException; + /** + * Load the state of NM tokens + * @return recovered state of NM tokens + * @throws IOException + */ public abstract RecoveredNMTokensState loadNMTokensState() throws IOException; + /** + * Record the current NM token master key + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous NM token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record a master key corresponding to an application + * @param attempt the application attempt ID + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException; + /** + * Remove a master key corresponding to an application + * @param attempt the application attempt ID + * @throws IOException + */ public abstract void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException; + /** + * Load the state of container tokens + * @return recovered state of container tokens + * @throws IOException + */ public abstract RecoveredContainerTokensState loadContainerTokensState() throws IOException; + /** + * Record the current container token master key + * @param key the master key + * @throws IOException + */ public abstract void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous container token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record the expiration time for a container token + * @param containerId the container ID + * @param expirationTime the container token expiration time + * @throws IOException + */ public abstract void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException; + /** + * Remove records for a container token + * @param containerId the container ID + * @throws IOException + */ public abstract void removeContainerToken(ContainerId containerId) throws IOException; Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Tue Aug 12 17:02:07 2014 @@ -33,6 +33,7 @@ #include <limits.h> #include <sys/stat.h> #include <sys/mount.h> +#include <sys/wait.h> static const int DEFAULT_MIN_USERID = 1000; @@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const } /** + * Write the exit code of the container into the exit code file + * exit_code_file: Path to exit code file where exit code needs to be written + */ +static int write_exit_code_file(const char* exit_code_file, int exit_code) { + char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1, + exit_code_file); + if (tmp_ecode_file == NULL) { + return -1; + } + + // create with 700 + int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU); + if (ecode_fd == -1) { + fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file, + strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + char ecode_buf[21]; + snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code); + ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf)); + close(ecode_fd); + if (written == -1) { + fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n", + tmp_ecode_file, strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + // rename temp file to actual exit code file + // use rename as atomic + if (rename(tmp_ecode_file, exit_code_file)) { + fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n", + tmp_ecode_file, exit_code_file, strerror(errno)); + unlink(tmp_ecode_file); + free(tmp_ecode_file); + return -1; + } + + free(tmp_ecode_file); + return 0; +} + +/** + * Wait for the container process to exit and write the exit code to + * the exit code file. + * Returns the exit code of the container process. + */ +static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) { + int child_status = -1; + int exit_code = -1; + int waitpid_result; + + if (change_effective_user(nm_uid, nm_gid) != 0) { + return -1; + } + do { + waitpid_result = waitpid(pid, &child_status, 0); + } while (waitpid_result == -1 && errno == EINTR); + if (waitpid_result < 0) { + fprintf(LOGFILE, "Error waiting for container process %d - %s\n", + pid, strerror(errno)); + return -1; + } + if (WIFEXITED(child_status)) { + exit_code = WEXITSTATUS(child_status); + } else if (WIFSIGNALED(child_status)) { + exit_code = 0x80 + WTERMSIG(child_status); + } else { + fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid); + } + if (write_exit_code_file(exit_code_file, exit_code) < 0) { + return -1; + } + return exit_code; +} + +/** * Change the real and effective user and group to abandon the super user * priviledges. */ @@ -337,6 +417,10 @@ char *get_container_work_directory(const nm_root, user, app_id, container_id); } +char *get_exit_code_file(const char* pid_file) { + return concatenate("%s.exitcode", "exit_code_file", 1, pid_file); +} + char *get_container_launcher_file(const char* work_dir) { return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT); } @@ -879,6 +963,8 @@ int launch_container_as_user(const char int exit_code = -1; char *script_file_dest = NULL; char *cred_file_dest = NULL; + char *exit_code_file = NULL; + script_file_dest = get_container_launcher_file(work_dir); if (script_file_dest == NULL) { exit_code = OUT_OF_MEMORY; @@ -889,6 +975,11 @@ int launch_container_as_user(const char exit_code = OUT_OF_MEMORY; goto cleanup; } + exit_code_file = get_exit_code_file(pid_file); + if (NULL == exit_code_file) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } // open launch script int container_file_source = open_file_as_nm(script_name); @@ -902,6 +993,13 @@ int launch_container_as_user(const char goto cleanup; } + pid_t child_pid = fork(); + if (child_pid != 0) { + // parent + exit_code = wait_and_write_exit_code(child_pid, exit_code_file); + goto cleanup; + } + // setsid pid_t pid = setsid(); if (pid == -1) { @@ -986,6 +1084,7 @@ int launch_container_as_user(const char exit_code = 0; cleanup: + free(exit_code_file); free(script_file_dest); free(cred_file_dest); return exit_code; Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Tue Aug 12 17:02:07 2014 @@ -201,6 +201,7 @@ public class TestNodeStatusUpdater { Dispatcher mockDispatcher = mock(Dispatcher.class); EventHandler mockEventHandler = mock(EventHandler.class); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); + NMStateStoreService stateStore = new NMNullStateStoreService(); nodeStatus.setResponseId(heartBeatID++); Map<ApplicationId, List<ContainerStatus>> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); @@ -226,9 +227,8 @@ public class TestNodeStatusUpdater { firstContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -257,9 +257,8 @@ public class TestNodeStatusUpdater { secondContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -784,7 +783,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - nodeStatusUpdater.updateStoppedContainersInCache(cId); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue Aug 12 17:02:07 2014 @@ -233,7 +233,7 @@ public abstract class BaseContainerManag protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path[] baseDirs) { + public void delete(String user, Path subDir, Path... baseDirs) { // Don't do any deletions. LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + ", baseDirs - " + baseDirs); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Tue Aug 12 17:02:07 2014 @@ -191,7 +191,8 @@ public class TestAuxServices { ContainerTokenIdentifier cti = new ContainerTokenIdentifier( ContainerId.newInstance(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); - Container container = new ContainerImpl(null, null, null, null, null, cti); + Container container = new ContainerImpl(null, null, null, null, null, + null, cti); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java Tue Aug 12 17:02:07 2014 @@ -80,6 +80,7 @@ public class TestContainerManagerRecover public void testApplicationRecovery() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Aug 12 17:02:07 2014 @@ -780,7 +780,8 @@ public class TestContainer { } when(ctxt.getServiceData()).thenReturn(serviceData); - c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), + ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, new EventHandler<ContainerEvent>() { @Override Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Tue Aug 12 17:02:07 2014 @@ -22,13 +22,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.api public class NMMemoryStateStoreService extends NMStateStoreService { private Map<ApplicationId, ContainerManagerApplicationProto> apps; private Set<ApplicationId> finishedApps; + private Map<ContainerId, RecoveredContainerState> containerStates; private Map<TrackerKey, TrackerState> trackerStates; private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; private RecoveredNMTokensState nmTokenState; @@ -53,6 +57,7 @@ public class NMMemoryStateStoreService e protected void initStorage(Configuration conf) { apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>(); finishedApps = new HashSet<ApplicationId>(); + containerStates = new HashMap<ContainerId, RecoveredContainerState>(); nmTokenState = new RecoveredNMTokensState(); nmTokenState.applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>(); @@ -100,6 +105,77 @@ public class NMMemoryStateStoreService e finishedApps.remove(appId); } + @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + // return a copy so caller can't modify our state + List<RecoveredContainerState> result = + new ArrayList<RecoveredContainerState>(containerStates.size()); + for (RecoveredContainerState rcs : containerStates.values()) { + RecoveredContainerState rcsCopy = new RecoveredContainerState(); + rcsCopy.status = rcs.status; + rcsCopy.exitCode = rcs.exitCode; + rcsCopy.killed = rcs.killed; + rcsCopy.diagnostics = rcs.diagnostics; + rcsCopy.startRequest = rcs.startRequest; + result.add(rcsCopy); + } + return new ArrayList<RecoveredContainerState>(); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.startRequest = startRequest; + containerStates.put(containerId, rcs); + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.diagnostics = diagnostics.toString(); + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + if (rcs.exitCode != ContainerExitStatus.INVALID) { + throw new IOException("Container already completed"); + } + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.killed = true; + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = exitCode; + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + containerStates.remove(containerId); + } + + private RecoveredContainerState getRecoveredContainerState( + ContainerId containerId) throws IOException { + RecoveredContainerState rcs = containerStates.get(containerId); + if (rcs == null) { + throw new IOException("No start request for " + containerId); + } + return rcs; + } private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Tue Aug 12 17:02:07 2014 @@ -25,18 +25,30 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,9 +56,12 @@ import org.apache.hadoop.yarn.proto.Yarn import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; @@ -193,6 +208,115 @@ public class TestNMLeveldbStateStoreServ } @Test + public void testContainerStorage() throws IOException { + // test empty when no state + List<RecoveredContainerState> recoveredContainers = + stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 5); + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put("rsrc", lrsrc); + Map<String, String> env = new HashMap<String, String>(); + env.put("somevar", "someval"); + List<String> containerCmds = new ArrayList<String>(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = + ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, serviceData, containerTokens, + acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = + new ContainerTokenIdentifier(containerId, "host", "user", + containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), + 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = + StartContainerRequest.newInstance(clc, containerToken); + + // store a container and verify recovered + stateStore.storeContainer(containerId, containerReq); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + + // launch the container, add some diagnostics, and verify recovered + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // mark the container killed, add some more diags, and verify recovered + diags.append("some more diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerKilled(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // add yet more diags, mark container completed, and verify recovered + diags.append("some final diags"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerCompleted(containerId, 21); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); + assertEquals(21, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // remove the container and verify not recovered + stateStore.removeContainer(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + } + + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Aug 12 17:02:07 2014 @@ -209,7 +209,7 @@ public class TestNMWebServer { BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, "password".getBytes(), currentTime); Container container = - new ContainerImpl(conf, dispatcher, launchContext, + new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, BuilderUtils.newContainerTokenIdentifier(containerToken)) { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Aug 12 17:02:07 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; @@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -189,7 +192,7 @@ public class ApplicationMasterService ex return result; } - private ApplicationAttemptId authorizeRequest() + private AMRMTokenIdentifier authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; @@ -226,7 +229,7 @@ public class ApplicationMasterService ex throw RPCUtil.getRemoteException(message); } - return appTokenIdentifier.getApplicationAttemptId(); + return appTokenIdentifier; } @Override @@ -234,7 +237,9 @@ public class ApplicationMasterService ex RegisterApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); ApplicationId appID = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); @@ -333,7 +338,8 @@ public class ApplicationMasterService ex FinishApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + authorizeRequest().getApplicationAttemptId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -408,7 +414,10 @@ public class ApplicationMasterService ex public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - ApplicationAttemptId appAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -557,6 +566,23 @@ public class ApplicationMasterService ex allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token<AMRMTokenIdentifier> amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken); + allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); + } + /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 12 17:02:07 2014 @@ -461,7 +461,6 @@ public class ResourceManager extends Com rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); - rmContext.setClientRMService(clientRM); addService(clientRM); rmContext.setClientRMService(clientRM); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Aug 12 17:02:07 2014 @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -226,7 +227,7 @@ public class AMLauncher implements Runna } // Add AMRMToken - Token<AMRMTokenIdentifier> amrmToken = getAMRMToken(); + Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken(); if (amrmToken != null) { credentials.addToken(amrmToken.getService(), amrmToken); } @@ -236,8 +237,12 @@ public class AMLauncher implements Runna } @VisibleForTesting - protected Token<AMRMTokenIdentifier> getAMRMToken() { - return application.getAMRMToken(); + protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { + Token<AMRMTokenIdentifier> amrmToken = + this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + application.getAppAttemptId()); + ((RMAppAttemptImpl)application).setAMRMToken(amrmToken); + return amrmToken; } @SuppressWarnings("unchecked") Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Tue Aug 12 17:02:07 2014 @@ -71,6 +71,10 @@ import com.google.common.annotations.Vis * FileSystem interface. Does not use directories so that simple key-value * stores can be used. The retry policy for the real filesystem client must be * configured separately to enable retry of filesystem operations when needed. + * + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. */ public class FileSystemRMStateStore extends RMStateStore { @@ -78,7 +82,7 @@ public class FileSystemRMStateStore exte protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 1); + .newInstance(1, 2); protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode"; Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Tue Aug 12 17:02:07 2014 @@ -32,7 +32,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; @@ -54,13 +52,13 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -134,7 +132,8 @@ public abstract class RMStateStore exten LOG.info("Storing info for app: " + appId); try { store.storeApplicationStateInternal(appId, appStateData); - store.notifyDoneStoringApplication(appId, null); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); store.notifyStoreOperationFailed(e); @@ -158,7 +157,8 @@ public abstract class RMStateStore exten LOG.info("Updating info for app: " + appId); try { store.updateApplicationStateInternal(appId, appStateData); - store.notifyDoneUpdatingApplication(appId, null); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating app: " + appId, e); store.notifyStoreOperationFailed(e); @@ -207,8 +207,9 @@ public abstract class RMStateStore exten } store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - null); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); store.notifyStoreOperationFailed(e); @@ -235,8 +236,9 @@ public abstract class RMStateStore exten } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - null); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); store.notifyStoreOperationFailed(e); @@ -769,10 +771,7 @@ public abstract class RMStateStore exten public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { Credentials credentials = new Credentials(); - Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken(); - if(appToken != null){ - credentials.addToken(AM_RM_TOKEN_SERVICE, appToken); - } + SecretKey clientTokenMasterKey = appAttempt.getClientTokenMasterKey(); if(clientTokenMasterKey != null){ @@ -806,47 +805,28 @@ public abstract class RMStateStore exten } rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause)); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application that new application is stored in state store - * @param appId id of the application that has been saved - * @param storedException the exception that is thrown when storing the - * application - */ - private void notifyDoneStoringApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppNewSavedEvent(appId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppUpdateSavedEvent(appId, storedException)); + * This method is called to notify the application that + * new application is stored or updated in state store + * @param event App event containing the app id and event type + */ + private void notifyApplication(RMAppEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application attempt that new attempt is stored in state store - * @param appAttempt attempt that has been saved - */ - private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptNewSavedEvent(attemptId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId, - Exception updatedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); + * This method is called to notify the application attempt + * that new attempt is stored or updated in state store + * @param event App attempt event containing the app attempt + * id and event type + */ + private void notifyApplicationAttempt(RMAppAttemptEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Tue Aug 12 17:02:07 2014 @@ -78,6 +78,11 @@ import org.apache.zookeeper.server.auth. import com.google.common.annotations.VisibleForTesting; +/** + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. + */ @Private @Unstable public class ZKRMStateStore extends RMStateStore { @@ -87,7 +92,7 @@ public class ZKRMStateStore extends RMSt protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 1); + .newInstance(1, 2); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 12 17:02:07 2014 @@ -820,17 +820,6 @@ public class RMAppImpl implements RMApp, RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - if (event instanceof RMAppNewSavedEvent) { - RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; - // For HA this exception needs to be handled by giving up - // master status if we got fenced - if (((RMAppNewSavedEvent) event).getStoredException() != null) { - LOG.error( - "Failed to store application: " + storeEvent.getApplicationId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user)); } @@ -848,13 +837,6 @@ public class RMAppImpl implements RMApp, @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application" - + storeEvent.getApplicationId(), storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, app.eventCausingFinalSaving); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Aug 12 17:02:07 2014 @@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUti import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -79,11 +80,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -397,7 +396,6 @@ public class RMAppAttemptImpl implements RMAppAttemptState.KILLED, RMAppAttemptState.KILLED, EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED, - RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.LAUNCHED, RMAppAttemptEventType.LAUNCH_FAILED, RMAppAttemptEventType.EXPIRE, @@ -559,7 +557,22 @@ public class RMAppAttemptImpl implements @Override public Token<AMRMTokenIdentifier> getAMRMToken() { - return this.amrmToken; + this.readLock.lock(); + try { + return this.amrmToken; + } finally { + this.readLock.unlock(); + } + } + + @Private + public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) { + this.writeLock.lock(); + try { + this.amrmToken = lastToken; + } finally { + this.writeLock.unlock(); + } } @Override @@ -713,7 +726,8 @@ public class RMAppAttemptImpl implements this.attemptMetrics.setIsPreempted(); } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); + recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -725,9 +739,11 @@ public class RMAppAttemptImpl implements this.justFinishedContainers = attempt.getJustFinishedContainers(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens) - throws IOException { - if (appAttemptTokens == null) { + private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + RMAppAttemptState state) throws IOException { + if (appAttemptTokens == null || state == RMAppAttemptState.FAILED + || state == RMAppAttemptState.FINISHED + || state == RMAppAttemptState.KILLED) { return; } @@ -738,12 +754,9 @@ public class RMAppAttemptImpl implements .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); } - // Only one AMRMToken is stored per-attempt, so this should be fine. Can't - // use TokenSelector as service may change - think fail-over. this.amrmToken = - (Token<AMRMTokenIdentifier>) appAttemptTokens - .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); } private static class BaseTransition implements @@ -779,11 +792,6 @@ public class RMAppAttemptImpl implements .createMasterKey(appAttempt.applicationAttemptId); } - // create AMRMToken - appAttempt.amrmToken = - appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( - appAttempt.applicationAttemptId); - // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( @@ -895,7 +903,6 @@ public class RMAppAttemptImpl implements @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); appAttempt.launchAttempt(); } } @@ -1047,14 +1054,6 @@ public class RMAppAttemptImpl implements @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application attempt: " - + storeEvent.getApplicationAttemptId(), - storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; if (appAttempt.transitionTodo instanceof SingleArcTransition) { @@ -1184,12 +1183,11 @@ public class RMAppAttemptImpl implements @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); - // TODO Today unmanaged AM client is waiting for app state to be Accepted to - // launch the AM. This is broken since we changed to start the attempt - // after the application is Accepted. We may need to introduce an attempt - // report that client can rely on to query the attempt state and choose to - // launch the unmanaged AM. + // create AMRMToken + appAttempt.amrmToken = + appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.applicationAttemptId); + super.transition(appAttempt, event); } } @@ -1677,18 +1675,6 @@ public class RMAppAttemptImpl implements rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); } - private void checkAttemptStoreError(RMAppAttemptEvent event) { - RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event; - if(storeEvent.getStoredException() != null) - { - // This needs to be handled for HA and give up master status if we got - // fenced - LOG.error("Failed to store attempt: " + getAppAttemptId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } - private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 12 17:02:07 2014 @@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod private final RMContext context; private final String hostName; private final int commandPort; - private final int httpPort; + private int httpPort; private final String nodeAddress; // The containerManager address - private final String httpAddress; + private String httpAddress; private volatile ResourceOption resourceOption; private final Node node; @@ -521,37 +521,15 @@ public class RMNodeImpl implements RMNod @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) - && rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); - } + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.resourceOption = newNode.getResourceOption(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
