Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Wed Aug 20 01:34:29 2014 @@ -33,6 +33,7 @@ #include <limits.h> #include <sys/stat.h> #include <sys/mount.h> +#include <sys/wait.h> static const int DEFAULT_MIN_USERID = 1000; @@ -111,16 +112,16 @@ int check_executor_permissions(char *exe return -1; } - // check others do not have read/write/execute permissions - if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH) - == S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) { + // check others do not have write/execute permissions + if ((filestat.st_mode & S_IWOTH) == S_IWOTH || + (filestat.st_mode & S_IXOTH) == S_IXOTH) { fprintf(LOGFILE, - "The container-executor binary should not have read or write or" - " execute for others.\n"); + "The container-executor binary should not have write or execute " + "for others.\n"); return -1; } - // Binary should be setuid/setgid executable + // Binary should be setuid executable if ((filestat.st_mode & S_ISUID) == 0) { fprintf(LOGFILE, "The container-executor binary should be set setuid.\n"); return -1; @@ -245,6 +246,85 @@ static int write_pid_to_file_as_nm(const } /** + * Write the exit code of the container into the exit code file + * exit_code_file: Path to exit code file where exit code needs to be written + */ +static int write_exit_code_file(const char* exit_code_file, int exit_code) { + char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1, + exit_code_file); + if (tmp_ecode_file == NULL) { + return -1; + } + + // create with 700 + int ecode_fd = open(tmp_ecode_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU); + if (ecode_fd == -1) { + fprintf(LOGFILE, "Can't open file %s - %s\n", tmp_ecode_file, + strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + char ecode_buf[21]; + snprintf(ecode_buf, sizeof(ecode_buf), "%d", exit_code); + ssize_t written = write(ecode_fd, ecode_buf, strlen(ecode_buf)); + close(ecode_fd); + if (written == -1) { + fprintf(LOGFILE, "Failed to write exit code to file %s - %s\n", + tmp_ecode_file, strerror(errno)); + free(tmp_ecode_file); + return -1; + } + + // rename temp file to actual exit code file + // use rename as atomic + if (rename(tmp_ecode_file, exit_code_file)) { + fprintf(LOGFILE, "Can't move exit code file from %s to %s - %s\n", + tmp_ecode_file, exit_code_file, strerror(errno)); + unlink(tmp_ecode_file); + free(tmp_ecode_file); + return -1; + } + + free(tmp_ecode_file); + return 0; +} + +/** + * Wait for the container process to exit and write the exit code to + * the exit code file. + * Returns the exit code of the container process. + */ +static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) { + int child_status = -1; + int exit_code = -1; + int waitpid_result; + + if (change_effective_user(nm_uid, nm_gid) != 0) { + return -1; + } + do { + waitpid_result = waitpid(pid, &child_status, 0); + } while (waitpid_result == -1 && errno == EINTR); + if (waitpid_result < 0) { + fprintf(LOGFILE, "Error waiting for container process %d - %s\n", + pid, strerror(errno)); + return -1; + } + if (WIFEXITED(child_status)) { + exit_code = WEXITSTATUS(child_status); + } else if (WIFSIGNALED(child_status)) { + exit_code = 0x80 + WTERMSIG(child_status); + } else { + fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid); + } + if (write_exit_code_file(exit_code_file, exit_code) < 0) { + return -1; + } + return exit_code; +} + +/** * Change the real and effective user and group to abandon the super user * priviledges. */ @@ -337,6 +417,10 @@ char *get_container_work_directory(const nm_root, user, app_id, container_id); } +char *get_exit_code_file(const char* pid_file) { + return concatenate("%s.exitcode", "exit_code_file", 1, pid_file); +} + char *get_container_launcher_file(const char* work_dir) { return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT); } @@ -879,6 +963,8 @@ int launch_container_as_user(const char int exit_code = -1; char *script_file_dest = NULL; char *cred_file_dest = NULL; + char *exit_code_file = NULL; + script_file_dest = get_container_launcher_file(work_dir); if (script_file_dest == NULL) { exit_code = OUT_OF_MEMORY; @@ -889,6 +975,11 @@ int launch_container_as_user(const char exit_code = OUT_OF_MEMORY; goto cleanup; } + exit_code_file = get_exit_code_file(pid_file); + if (NULL == exit_code_file) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } // open launch script int container_file_source = open_file_as_nm(script_name); @@ -902,6 +993,13 @@ int launch_container_as_user(const char goto cleanup; } + pid_t child_pid = fork(); + if (child_pid != 0) { + // parent + exit_code = wait_and_write_exit_code(child_pid, exit_code_file); + goto cleanup; + } + // setsid pid_t pid = setsid(); if (pid == -1) { @@ -986,6 +1084,7 @@ int launch_container_as_user(const char exit_code = 0; cleanup: + free(exit_code_file); free(script_file_dest); free(cred_file_dest); return exit_code;
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Wed Aug 20 01:34:29 2014 @@ -24,6 +24,13 @@ package hadoop.yarn; import "yarn_protos.proto"; +message ContainerManagerApplicationProto { + optional ApplicationIdProto id = 1; + optional string user = 2; + optional bytes credentials = 3; + repeated ApplicationACLMapProto acls = 4; +} + message DeletionServiceDeleteTaskProto { optional int32 id = 1; optional string user = 2; @@ -39,8 +46,3 @@ message LocalizedResourceProto { optional int64 size = 3; } -message NMDBSchemaVersionProto { - optional int32 majorVersion = 1; - optional int32 minorVersion = 2; -} - Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Aug 20 01:34:29 2014 @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -91,8 +93,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater { @@ -201,6 +201,7 @@ public class TestNodeStatusUpdater { Dispatcher mockDispatcher = mock(Dispatcher.class); EventHandler mockEventHandler = mock(EventHandler.class); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); + NMStateStoreService stateStore = new NMNullStateStoreService(); nodeStatus.setResponseId(heartBeatID++); Map<ApplicationId, List<ContainerStatus>> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); @@ -226,9 +227,8 @@ public class TestNodeStatusUpdater { firstContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -257,9 +257,8 @@ public class TestNodeStatusUpdater { secondContainerID, InetAddress.getByName("localhost") .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); - Container container = - new ContainerImpl(conf, mockDispatcher, launchContext, null, - mockMetrics, containerToken); + Container container = new ContainerImpl(conf, mockDispatcher, + stateStore, launchContext, null, mockMetrics, containerToken); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -784,7 +783,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - nodeStatusUpdater.updateStoppedContainersInCache(cId); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed Aug 20 01:34:29 2014 @@ -233,7 +233,7 @@ public abstract class BaseContainerManag protected DeletionService createDeletionService() { return new DeletionService(exec) { @Override - public void delete(String user, Path subDir, Path[] baseDirs) { + public void delete(String user, Path subDir, Path... baseDirs) { // Don't do any deletions. LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir + ", baseDirs - " + baseDirs); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed Aug 20 01:34:29 2014 @@ -191,7 +191,8 @@ public class TestAuxServices { ContainerTokenIdentifier cti = new ContainerTokenIdentifier( ContainerId.newInstance(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); - Container container = new ContainerImpl(null, null, null, null, null, cti); + Container container = new ContainerImpl(null, null, null, null, null, + null, cti); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed Aug 20 01:34:29 2014 @@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -722,6 +723,8 @@ public class TestContainer { Context context = mock(Context.class); when(context.getApplications()).thenReturn( new ConcurrentHashMap<ApplicationId, Application>()); + NMNullStateStoreService stateStore = new NMNullStateStoreService(); + when(context.getNMStateStore()).thenReturn(stateStore); ContainerExecutor executor = mock(ContainerExecutor.class); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); @@ -777,7 +780,8 @@ public class TestContainer { } when(ctxt.getServiceData()).thenReturn(serviceData); - c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), + ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, new EventHandler<ContainerEvent>() { @Override Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Wed Aug 20 01:34:29 2014 @@ -21,20 +21,29 @@ package org.apache.hadoop.yarn.server.no import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; public class NMMemoryStateStoreService extends NMStateStoreService { + private Map<ApplicationId, ContainerManagerApplicationProto> apps; + private Set<ApplicationId> finishedApps; + private Map<ContainerId, RecoveredContainerState> containerStates; private Map<TrackerKey, TrackerState> trackerStates; private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; private RecoveredNMTokensState nmTokenState; @@ -44,6 +53,130 @@ public class NMMemoryStateStoreService e super(NMMemoryStateStoreService.class.getName()); } + @Override + protected void initStorage(Configuration conf) { + apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>(); + finishedApps = new HashSet<ApplicationId>(); + containerStates = new HashMap<ContainerId, RecoveredContainerState>(); + nmTokenState = new RecoveredNMTokensState(); + nmTokenState.applicationMasterKeys = + new HashMap<ApplicationAttemptId, MasterKey>(); + containerTokenState = new RecoveredContainerTokensState(); + containerTokenState.activeTokens = new HashMap<ContainerId, Long>(); + trackerStates = new HashMap<TrackerKey, TrackerState>(); + deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>(); + } + + @Override + protected void startStorage() { + } + + @Override + protected void closeStorage() { + } + + + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.applications = new ArrayList<ContainerManagerApplicationProto>( + apps.values()); + state.finishedApplications = new ArrayList<ApplicationId>(finishedApps); + return state; + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto proto) throws IOException { + ContainerManagerApplicationProto protoCopy = + ContainerManagerApplicationProto.parseFrom(proto.toByteString()); + apps.put(appId, protoCopy); + } + + @Override + public void storeFinishedApplication(ApplicationId appId) { + finishedApps.add(appId); + } + + @Override + public void removeApplication(ApplicationId appId) throws IOException { + apps.remove(appId); + finishedApps.remove(appId); + } + + @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + // return a copy so caller can't modify our state + List<RecoveredContainerState> result = + new ArrayList<RecoveredContainerState>(containerStates.size()); + for (RecoveredContainerState rcs : containerStates.values()) { + RecoveredContainerState rcsCopy = new RecoveredContainerState(); + rcsCopy.status = rcs.status; + rcsCopy.exitCode = rcs.exitCode; + rcsCopy.killed = rcs.killed; + rcsCopy.diagnostics = rcs.diagnostics; + rcsCopy.startRequest = rcs.startRequest; + result.add(rcsCopy); + } + return new ArrayList<RecoveredContainerState>(); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.startRequest = startRequest; + containerStates.put(containerId, rcs); + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.diagnostics = diagnostics.toString(); + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + if (rcs.exitCode != ContainerExitStatus.INVALID) { + throw new IOException("Container already completed"); + } + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.killed = true; + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = exitCode; + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + containerStates.remove(containerId); + } + + private RecoveredContainerState getRecoveredContainerState( + ContainerId containerId) throws IOException { + RecoveredContainerState rcs = containerStates.get(containerId); + if (rcs == null) { + throw new IOException("No start request for " + containerId); + } + return rcs; + } + private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); result.localizedResources.addAll(ts.localizedResources.values()); @@ -117,25 +250,6 @@ public class NMMemoryStateStoreService e } } - @Override - protected void initStorage(Configuration conf) { - nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = - new HashMap<ApplicationAttemptId, MasterKey>(); - containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap<ContainerId, Long>(); - trackerStates = new HashMap<TrackerKey, TrackerState>(); - deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>(); - } - - @Override - protected void startStorage() { - } - - @Override - protected void closeStorage() { - } - @Override public RecoveredDeletionServiceState loadDeletionServiceState() Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Wed Aug 20 01:34:29 2014 @@ -25,31 +25,49 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; -import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -114,12 +132,12 @@ public class TestNMLeveldbStateStoreServ @Test public void testCheckVersion() throws IOException { // default version - NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion(); + Version defaultVersion = stateStore.getCurrentVersion(); Assert.assertEquals(defaultVersion, stateStore.loadVersion()); // compatible version - NMDBSchemaVersion compatibleVersion = - NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(), + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), defaultVersion.getMinorVersion() + 2); stateStore.storeVersion(compatibleVersion); Assert.assertEquals(compatibleVersion, stateStore.loadVersion()); @@ -128,8 +146,8 @@ public class TestNMLeveldbStateStoreServ Assert.assertEquals(defaultVersion, stateStore.loadVersion()); // incompatible version - NMDBSchemaVersion incompatibleVersion = - NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1, + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion()); stateStore.storeVersion(incompatibleVersion); try { @@ -142,6 +160,163 @@ public class TestNMLeveldbStateStoreServ } @Test + public void testApplicationStorage() throws IOException { + // test empty when no state + RecoveredApplicationsState state = stateStore.loadApplicationsState(); + assertTrue(state.getApplications().isEmpty()); + assertTrue(state.getFinishedApplications().isEmpty()); + + // store an application and verify recovered + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId1).getProto()); + builder.setUser("user1"); + ContainerManagerApplicationProto appProto1 = builder.build(); + stateStore.storeApplication(appId1, appProto1); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(1, state.getApplications().size()); + assertEquals(appProto1, state.getApplications().get(0)); + assertTrue(state.getFinishedApplications().isEmpty()); + + // finish an application and add a new one + stateStore.storeFinishedApplication(appId1); + final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); + builder = ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId2).getProto()); + builder.setUser("user2"); + ContainerManagerApplicationProto appProto2 = builder.build(); + stateStore.storeApplication(appId2, appProto2); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(2, state.getApplications().size()); + assertTrue(state.getApplications().contains(appProto1)); + assertTrue(state.getApplications().contains(appProto2)); + assertEquals(1, state.getFinishedApplications().size()); + assertEquals(appId1, state.getFinishedApplications().get(0)); + + // test removing an application + stateStore.storeFinishedApplication(appId2); + stateStore.removeApplication(appId2); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(1, state.getApplications().size()); + assertEquals(appProto1, state.getApplications().get(0)); + assertEquals(1, state.getFinishedApplications().size()); + assertEquals(appId1, state.getFinishedApplications().get(0)); + } + + @Test + public void testContainerStorage() throws IOException { + // test empty when no state + List<RecoveredContainerState> recoveredContainers = + stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + // create a container request + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 5); + LocalResource lrsrc = LocalResource.newInstance( + URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, + 1234567890L); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put("rsrc", lrsrc); + Map<String, String> env = new HashMap<String, String>(); + env.put("somevar", "someval"); + List<String> containerCmds = new ArrayList<String>(); + containerCmds.add("somecmd"); + containerCmds.add("somearg"); + Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); + serviceData.put("someservice", + ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer containerTokens = + ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(); + acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); + acls.put(ApplicationAccessType.MODIFY_APP, "moduser"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, env, containerCmds, serviceData, containerTokens, + acls); + Resource containerRsrc = Resource.newInstance(1357, 3); + ContainerTokenIdentifier containerTokenId = + new ContainerTokenIdentifier(containerId, "host", "user", + containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7), + 13579); + Token containerToken = Token.newInstance(containerTokenId.getBytes(), + ContainerTokenIdentifier.KIND.toString(), "password".getBytes(), + "tokenservice"); + StartContainerRequest containerReq = + StartContainerRequest.newInstance(clc, containerToken); + + // store a container and verify recovered + stateStore.storeContainer(containerId, containerReq); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + + // launch the container, add some diagnostics, and verify recovered + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // mark the container killed, add some more diags, and verify recovered + diags.append("some more diags for container"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerKilled(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // add yet more diags, mark container completed, and verify recovered + diags.append("some final diags"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerCompleted(containerId, 21); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); + assertEquals(21, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + + // remove the container and verify not recovered + stateStore.removeContainer(containerId); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + } + + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed Aug 20 01:34:29 2014 @@ -209,7 +209,7 @@ public class TestNMWebServer { BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, "password".getBytes(), currentTime); Container container = - new ContainerImpl(conf, dispatcher, launchContext, + new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, BuilderUtils.newContainerTokenIdentifier(containerToken)) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml Wed Aug 20 01:34:29 2014 @@ -108,4 +108,27 @@ </description> </property> + <property> + <name>yarn.scheduler.capacity.queue-mappings</name> + <value></value> + <description> + A list of mappings that will be used to assign jobs to queues + The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]* + Typically this list will be used to map users to queues, + for example, u:%user:%user maps all users to queues with the same name + as the user. + </description> + </property> + + <property> + <name>yarn.scheduler.capacity.queue-mappings-override.enable</name> + <value>false</value> + <description> + If a queue mapping is present, will it override the value specified + by the user? This can be used by administrators to place jobs in queues + that are different than the one specified by the user. + The default is false. + </description> + </property> + </configuration> Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Aug 20 01:34:29 2014 @@ -244,6 +244,37 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <phase>generate-sources</phase> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param> + <param>${basedir}/../../hadoop-yarn-api/src/main/proto</param> + <param>${basedir}/../hadoop-yarn-server-common/src/main/proto</param> + <param>${basedir}/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>yarn_server_resourcemanager_recovery.proto</include> + </includes> + </source> + <output>${project.build.directory}/generated-sources/java</output> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Wed Aug 20 01:34:29 2014 @@ -90,7 +90,9 @@ public class AdminService extends Compos private EmbeddedElectorService embeddedElector; private Server server; - private InetSocketAddress masterServiceAddress; + + // Address to use for binding. May be a wildcard address. + private InetSocketAddress masterServiceBindAddress; private AccessControlList adminAcl; private final RecordFactory recordFactory = @@ -114,10 +116,12 @@ public class AdminService extends Compos } } - masterServiceAddress = conf.getSocketAddr( + masterServiceBindAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + adminAcl = new AccessControlList(conf.get( YarnConfiguration.YARN_ADMIN_ACL, YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); @@ -141,7 +145,7 @@ public class AdminService extends Compos Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = (Server) rpc.getServer( - ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, + ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress, conf, null, conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); @@ -170,8 +174,10 @@ public class AdminService extends Compos } this.server.start(); - conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS, - server.getListenerAddress()); + conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + server.getListenerAddress()); } protected void stopServer() throws Exception { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Aug 20 01:34:29 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; @@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -127,6 +130,7 @@ public class ApplicationMasterService ex YarnRPC rpc = YarnRPC.create(conf); InetSocketAddress masterServiceAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); @@ -159,7 +163,9 @@ public class ApplicationMasterService ex this.server.start(); this.bindAddress = - conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, server.getListenerAddress()); super.serviceStart(); } @@ -186,7 +192,7 @@ public class ApplicationMasterService ex return result; } - private ApplicationAttemptId authorizeRequest() + private AMRMTokenIdentifier authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; @@ -223,7 +229,7 @@ public class ApplicationMasterService ex throw RPCUtil.getRemoteException(message); } - return appTokenIdentifier.getApplicationAttemptId(); + return appTokenIdentifier; } @Override @@ -231,7 +237,9 @@ public class ApplicationMasterService ex RegisterApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); ApplicationId appID = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); @@ -330,7 +338,8 @@ public class ApplicationMasterService ex FinishApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + authorizeRequest().getApplicationAttemptId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -405,7 +414,10 @@ public class ApplicationMasterService ex public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - ApplicationAttemptId appAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -554,6 +566,23 @@ public class ApplicationMasterService ex allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token<AMRMTokenIdentifier> amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken); + allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); + } + /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Aug 20 01:34:29 2014 @@ -199,7 +199,9 @@ public class ClientRMService extends Abs } this.server.start(); - clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, + clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, server.getListenerAddress()); super.serviceStart(); } @@ -213,7 +215,9 @@ public class ClientRMService extends Abs } InetSocketAddress getBindAddress(Configuration conf) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + return conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java Wed Aug 20 01:34:29 2014 @@ -60,7 +60,7 @@ public class RMSecretManagerService exte clientToAMSecretManager = createClientToAMTokenSecretManager(); rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager); - amRmTokenSecretManager = createAMRMTokenSecretManager(conf); + amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext); rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager); rmDTSecretManager = @@ -115,8 +115,8 @@ public class RMSecretManagerService exte } protected AMRMTokenSecretManager createAMRMTokenSecretManager( - Configuration conf) { - return new AMRMTokenSecretManager(conf); + Configuration conf, RMContext rmContext) { + return new AMRMTokenSecretManager(conf, rmContext); } protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Aug 20 01:34:29 2014 @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -32,11 +33,14 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -88,8 +92,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMAuthenticationHandler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -150,7 +157,8 @@ public class ResourceManager extends Com private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; - private String webAppAddress; + @VisibleForTesting + protected String webAppAddress; private ConfigurationProvider configurationProvider = null; /** End of Active services */ @@ -225,7 +233,9 @@ public class ResourceManager extends Com } createAndInitActiveServices(); - webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf); + webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, + YarnConfiguration.RM_BIND_HOST, + WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); this.rmLoginUGI = UserGroupInformation.getCurrentUser(); @@ -453,7 +463,6 @@ public class ResourceManager extends Com rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); - rmContext.setClientRMService(clientRM); addService(clientRM); rmContext.setClientRMService(clientRM); @@ -789,6 +798,88 @@ public class ResourceManager extends Com } protected void startWepApp() { + + // Use the customized yarn filter instead of the standard kerberos filter to + // allow users to authenticate using delegation tokens + // 4 conditions need to be satisfied - + // 1. security is enabled + // 2. http auth type is set to kerberos + // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true + // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer + + Configuration conf = getConfig(); + boolean useYarnAuthenticationFilter = + conf.getBoolean( + YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER, + YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER); + String authPrefix = "hadoop.http.authentication."; + String authTypeKey = authPrefix + "type"; + String filterInitializerConfKey = "hadoop.http.filter.initializers"; + String actualInitializers = ""; + Class<?>[] initializersClasses = + conf.getClasses(filterInitializerConfKey); + + boolean hasHadoopAuthFilterInitializer = false; + boolean hasRMAuthFilterInitializer = false; + if (initializersClasses != null) { + for (Class<?> initializer : initializersClasses) { + if (initializer.getName().equals( + AuthenticationFilterInitializer.class.getName())) { + hasHadoopAuthFilterInitializer = true; + } + if (initializer.getName().equals( + RMAuthenticationFilterInitializer.class.getName())) { + hasRMAuthFilterInitializer = true; + } + } + if (UserGroupInformation.isSecurityEnabled() + && useYarnAuthenticationFilter + && hasHadoopAuthFilterInitializer + && conf.get(authTypeKey, "").equals( + KerberosAuthenticationHandler.TYPE)) { + ArrayList<String> target = new ArrayList<String>(); + for (Class<?> filterInitializer : initializersClasses) { + if (filterInitializer.getName().equals( + AuthenticationFilterInitializer.class.getName())) { + if (hasRMAuthFilterInitializer == false) { + target.add(RMAuthenticationFilterInitializer.class.getName()); + } + continue; + } + target.add(filterInitializer.getName()); + } + actualInitializers = StringUtils.join(",", target); + + LOG.info("Using RM authentication filter(kerberos/delegation-token)" + + " for RM webapp authentication"); + RMAuthenticationHandler + .setSecretManager(getClientRMService().rmDTSecretManager); + String yarnAuthKey = + authPrefix + RMAuthenticationFilter.AUTH_HANDLER_PROPERTY; + conf.setStrings(yarnAuthKey, RMAuthenticationHandler.class.getName()); + conf.set(filterInitializerConfKey, actualInitializers); + } + } + + // if security is not enabled and the default filter initializer has not + // been set, set the initializer to include the + // RMAuthenticationFilterInitializer which in turn will set up the simple + // auth filter. + + String initializers = conf.get(filterInitializerConfKey); + if (!UserGroupInformation.isSecurityEnabled()) { + if (initializersClasses == null || initializersClasses.length == 0) { + conf.set(filterInitializerConfKey, + RMAuthenticationFilterInitializer.class.getName()); + conf.set(authTypeKey, "simple"); + } else if (initializers.equals(StaticUserWebFilter.class.getName())) { + conf.set(filterInitializerConfKey, + RMAuthenticationFilterInitializer.class.getName() + "," + + initializers); + conf.set(authTypeKey, "simple"); + } + } + Builder<ApplicationMasterService> builder = WebApps .$for("cluster", ApplicationMasterService.class, masterService, @@ -1026,6 +1117,9 @@ public class ResourceManager extends Com // recover RMdelegationTokenSecretManager rmContext.getRMDelegationTokenSecretManager().recover(state); + // recover AMRMTokenSecretManager + rmContext.getAMRMTokenSecretManager().recover(state); + // recover applications rmAppManager.recover(state); } @@ -1067,6 +1161,9 @@ public class ResourceManager extends Com ((Service)dispatcher).init(this.conf); ((Service)dispatcher).start(); removeService((Service)rmDispatcher); + // Need to stop previous rmDispatcher before assigning new dispatcher + // otherwise causes "AsyncDispatcher event handler" thread leak + ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Aug 20 01:34:29 2014 @@ -121,6 +121,7 @@ public class ResourceTrackerService exte @Override protected void serviceInit(Configuration conf) throws Exception { resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); @@ -175,9 +176,11 @@ public class ResourceTrackerService exte } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } - + this.server.start(); - conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, server.getListenerAddress()); } @@ -308,7 +311,8 @@ public class ResourceTrackerService exte LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode)); + new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications())); } // On every node manager register we will be clearing NMToken keys if // present for any running application. Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Aug 20 01:34:29 2014 @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -226,7 +227,7 @@ public class AMLauncher implements Runna } // Add AMRMToken - Token<AMRMTokenIdentifier> amrmToken = getAMRMToken(); + Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken(); if (amrmToken != null) { credentials.addToken(amrmToken.getService(), amrmToken); } @@ -236,8 +237,12 @@ public class AMLauncher implements Runna } @VisibleForTesting - protected Token<AMRMTokenIdentifier> getAMRMToken() { - return application.getAMRMToken(); + protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { + Token<AMRMTokenIdentifier> amrmToken = + this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + application.getAppAttemptId()); + ((RMAppAttemptImpl)application).setAMRMToken(amrmToken); + return amrmToken; } @SuppressWarnings("unchecked") Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Wed Aug 20 01:34:29 2014 @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -43,20 +44,22 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; - import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -68,14 +71,20 @@ import com.google.common.annotations.Vis * FileSystem interface. Does not use directories so that simple key-value * stores can be used. The retry policy for the real filesystem client must be * configured separately to enable retry of filesystem operations when needed. + * + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. */ public class FileSystemRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; - protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 1); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 2); + protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = + "AMRMTokenSecretManagerNode"; protected FileSystem fs; @@ -89,6 +98,7 @@ public class FileSystemRMStateStore exte @VisibleForTesting Path fsWorkingPath; + Path amrmTokenSecretManagerRoot; @Override public synchronized void initInternal(Configuration conf) throws Exception{ @@ -96,6 +106,8 @@ public class FileSystemRMStateStore exte rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT); rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); + amrmTokenSecretManagerRoot = + new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); } @Override @@ -113,6 +125,7 @@ public class FileSystemRMStateStore exte fs = fsWorkingPath.getFileSystem(conf); fs.mkdirs(rmDTSecretManagerRoot); fs.mkdirs(rmAppRoot); + fs.mkdirs(amrmTokenSecretManagerRoot); } @Override @@ -121,18 +134,18 @@ public class FileSystemRMStateStore exte } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @Override - protected synchronized RMStateVersion loadVersion() throws Exception { + protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); if (fs.exists(versionNodePath)) { FileStatus status = fs.getFileStatus(versionNodePath); byte[] data = readFile(versionNodePath, status.getLen()); - RMStateVersion version = - new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); return version; } return null; @@ -142,7 +155,7 @@ public class FileSystemRMStateStore exte protected synchronized void storeVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); byte[] data = - ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (fs.exists(versionNodePath)) { updateFile(versionNodePath, data); } else { @@ -180,9 +193,32 @@ public class FileSystemRMStateStore exte loadRMDTSecretManagerState(rmState); // recover RM applications loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); return rmState; } + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws Exception { + checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); + Path amrmTokenSecretManagerStateDataDir = + new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); + FileStatus status; + try { + status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir); + assert status.isFile(); + } catch (FileNotFoundException ex) { + return; + } + byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); + } + private void loadRMAppState(RMState rmState) throws Exception { try { List<ApplicationAttemptState> attempts = @@ -597,4 +633,25 @@ public class FileSystemRMStateStore exte return new Path(root, nodeName); } + @Override + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate){ + Path nodeCreatePath = + getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); + AMRMTokenSecretManagerState data = + AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + if (isUpdate) { + updateFile(nodeCreatePath, stateData); + } else { + writeFile(nodeCreatePath, stateData); + } + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Wed Aug 20 01:34:29 2014 @@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import com.google.common.annotations.VisibleForTesting; @@ -72,6 +73,10 @@ public class MemoryRMStateStore extends state.rmSecretManagerState.getTokenState()); returnState.rmSecretManagerState.dtSequenceNumber = state.rmSecretManagerState.dtSequenceNumber; + returnState.amrmTokenSecretManagerState = + state.amrmTokenSecretManagerState == null ? null + : AMRMTokenSecretManagerState + .newInstance(state.amrmTokenSecretManagerState); return returnState; } @@ -254,7 +259,7 @@ public class MemoryRMStateStore extends } @Override - protected RMStateVersion loadVersion() throws Exception { + protected Version loadVersion() throws Exception { return null; } @@ -263,11 +268,21 @@ public class MemoryRMStateStore extends } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return null; } @Override + public void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate) { + if (amrmTokenSecretManagerState != null) { + state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState + .newInstance(amrmTokenSecretManagerState); + } + } + + @Override public void deleteStore() throws Exception { } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Wed Aug 20 01:34:29 2014 @@ -25,9 +25,10 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; @Unstable public class NullRMStateStore extends RMStateStore { @@ -122,7 +123,7 @@ public class NullRMStateStore extends RM } @Override - protected RMStateVersion loadVersion() throws Exception { + protected Version loadVersion() throws Exception { // Do nothing return null; } @@ -133,12 +134,18 @@ public class NullRMStateStore extends RM } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { // Do nothing return null; } @Override + public void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState state, boolean isUpdate) { + //DO Nothing + } + + @Override public void deleteStore() throws Exception { // Do nothing }
