Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Fri Jul 25 20:33:09 2014 @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.AccessControlException; import java.nio.ByteBuffer; +import java.security.Principal; import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Collection; @@ -36,6 +37,7 @@ import java.util.concurrent.ConcurrentMa import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -57,6 +59,8 @@ import org.apache.hadoop.io.DataOutputBu import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -67,6 +71,13 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -85,6 +96,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; @@ -109,6 +121,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo; @@ -118,6 +131,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -139,6 +153,9 @@ public class RMWebServices { private final Configuration conf; private @Context HttpServletResponse response; + public final static String DELEGATION_TOKEN_HEADER = + "Hadoop-YARN-RM-Delegation-Token"; + @Inject public RMWebServices(final ResourceManager rm, Configuration conf) { this.rm = rm; @@ -147,11 +164,7 @@ public class RMWebServices { protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) { // Check for the authorization. - String remoteUser = hsr.getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI != null && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI, ApplicationAccessType.VIEW_APP, app.getUser(), @@ -626,7 +639,7 @@ public class RMWebServices { public AppState getAppState(@Context HttpServletRequest hsr, @PathParam("appid") String appId) throws AuthorizationException { init(); - UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr); + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); String userName = ""; if (callerUGI != null) { userName = callerUGI.getUserName(); @@ -661,7 +674,7 @@ public class RMWebServices { IOException { init(); - UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr); + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { String msg = "Unable to obtain user name, user not authenticated"; throw new AuthorizationException(msg); @@ -771,9 +784,14 @@ public class RMWebServices { } private UserGroupInformation getCallerUserGroupInformation( - HttpServletRequest hsr) { + HttpServletRequest hsr, boolean usePrincipal) { String remoteUser = hsr.getRemoteUser(); + if (usePrincipal) { + Principal princ = hsr.getUserPrincipal(); + remoteUser = princ == null ? null : princ.getName(); + } + UserGroupInformation callerUGI = null; if (remoteUser != null) { callerUGI = UserGroupInformation.createRemoteUser(remoteUser); @@ -799,7 +817,7 @@ public class RMWebServices { public Response createNewApplication(@Context HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { init(); - UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr); + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { throw new AuthorizationException("Unable to obtain user name, " + "user not authenticated"); @@ -835,7 +853,7 @@ public class RMWebServices { IOException, InterruptedException { init(); - UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr); + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { throw new AuthorizationException("Unable to obtain user name, " + "user not authenticated"); @@ -887,8 +905,8 @@ public class RMWebServices { throw new YarnRuntimeException(msg, e); } NewApplication appId = - new NewApplication(resp.getApplicationId().toString(), new ResourceInfo( - resp.getMaximumResourceCapability())); + new NewApplication(resp.getApplicationId().toString(), + new ResourceInfo(resp.getMaximumResourceCapability())); return appId; } @@ -962,7 +980,8 @@ public class RMWebServices { * @throws IOException */ protected ContainerLaunchContext createContainerLaunchContext( - ApplicationSubmissionContextInfo newApp) throws BadRequestException, IOException { + ApplicationSubmissionContextInfo newApp) throws BadRequestException, + IOException { // create container launch context @@ -1033,4 +1052,238 @@ public class RMWebServices { } return ret; } + + private UserGroupInformation createKerberosUserGroupInformation( + HttpServletRequest hsr) throws AuthorizationException, YarnException { + + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + if (callerUGI == null) { + String msg = "Unable to obtain user name, user not authenticated"; + throw new AuthorizationException(msg); + } + + String authType = hsr.getAuthType(); + if (!KerberosAuthenticationHandler.TYPE.equals(authType)) { + String msg = + "Delegation token operations can only be carried out on a " + + "Kerberos authenticated channel"; + throw new YarnException(msg); + } + + callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS); + return callerUGI; + } + + @POST + @Path("/delegation-token") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response postDelegationToken(DelegationToken tokenData, + @Context HttpServletRequest hsr) throws AuthorizationException, + IOException, InterruptedException, Exception { + + init(); + UserGroupInformation callerUGI; + try { + callerUGI = createKerberosUserGroupInformation(hsr); + } catch (YarnException ye) { + return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); + } + return createDelegationToken(tokenData, hsr, callerUGI); + } + + @POST + @Path("/delegation-token/expiration") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response + postDelegationTokenExpiration(@Context HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException, + Exception { + + init(); + UserGroupInformation callerUGI; + try { + callerUGI = createKerberosUserGroupInformation(hsr); + } catch (YarnException ye) { + return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); + } + + DelegationToken requestToken = new DelegationToken(); + requestToken.setToken(extractToken(hsr).encodeToUrlString()); + return renewDelegationToken(requestToken, hsr, callerUGI); + } + + private Response createDelegationToken(DelegationToken tokenData, + HttpServletRequest hsr, UserGroupInformation callerUGI) + throws AuthorizationException, IOException, InterruptedException, + Exception { + + final String renewer = tokenData.getRenewer(); + GetDelegationTokenResponse resp; + try { + resp = + callerUGI + .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() { + @Override + public GetDelegationTokenResponse run() throws IOException, + YarnException { + GetDelegationTokenRequest createReq = + GetDelegationTokenRequest.newInstance(renewer); + return rm.getClientRMService().getDelegationToken(createReq); + } + }); + } catch (Exception e) { + LOG.info("Create delegation token request failed", e); + throw e; + } + + Token<RMDelegationTokenIdentifier> tk = + new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken() + .getIdentifier().array(), resp.getRMDelegationToken().getPassword() + .array(), new Text(resp.getRMDelegationToken().getKind()), new Text( + resp.getRMDelegationToken().getService())); + RMDelegationTokenIdentifier identifier = tk.decodeIdentifier(); + long currentExpiration = + rm.getRMContext().getRMDelegationTokenSecretManager() + .getRenewDate(identifier); + DelegationToken respToken = + new DelegationToken(tk.encodeToUrlString(), renewer, identifier + .getOwner().toString(), tk.getKind().toString(), currentExpiration, + identifier.getMaxDate()); + return Response.status(Status.OK).entity(respToken).build(); + } + + private Response renewDelegationToken(DelegationToken tokenData, + HttpServletRequest hsr, UserGroupInformation callerUGI) + throws AuthorizationException, IOException, InterruptedException, + Exception { + + Token<RMDelegationTokenIdentifier> token = + extractToken(tokenData.getToken()); + + org.apache.hadoop.yarn.api.records.Token dToken = + BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() + .toString(), token.getPassword(), token.getService().toString()); + final RenewDelegationTokenRequest req = + RenewDelegationTokenRequest.newInstance(dToken); + + RenewDelegationTokenResponse resp; + try { + resp = + callerUGI + .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() { + @Override + public RenewDelegationTokenResponse run() throws IOException, + YarnException { + return rm.getClientRMService().renewDelegationToken(req); + } + }); + } catch (UndeclaredThrowableException ue) { + if (ue.getCause() instanceof YarnException) { + if (ue.getCause().getCause() instanceof InvalidToken) { + throw new BadRequestException(ue.getCause().getCause().getMessage()); + } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) { + return Response.status(Status.FORBIDDEN) + .entity(ue.getCause().getCause().getMessage()).build(); + } + LOG.info("Renew delegation token request failed", ue); + throw ue; + } + LOG.info("Renew delegation token request failed", ue); + throw ue; + } catch (Exception e) { + LOG.info("Renew delegation token request failed", e); + throw e; + } + long renewTime = resp.getNextExpirationTime(); + + DelegationToken respToken = new DelegationToken(); + respToken.setNextExpirationTime(renewTime); + return Response.status(Status.OK).entity(respToken).build(); + } + + // For cancelling tokens, the encoded token is passed as a header + // There are two reasons for this - + // 1. Passing a request body as part of a DELETE request is not + // allowed by Jetty + // 2. Passing the encoded token as part of the url is not ideal + // since urls tend to get logged and anyone with access to + // the logs can extract tokens which are meant to be secret + @DELETE + @Path("/delegation-token") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response cancelDelegationToken(@Context HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException, + Exception { + + init(); + UserGroupInformation callerUGI; + try { + callerUGI = createKerberosUserGroupInformation(hsr); + } catch (YarnException ye) { + return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); + } + + Token<RMDelegationTokenIdentifier> token = extractToken(hsr); + + org.apache.hadoop.yarn.api.records.Token dToken = + BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() + .toString(), token.getPassword(), token.getService().toString()); + final CancelDelegationTokenRequest req = + CancelDelegationTokenRequest.newInstance(dToken); + + try { + callerUGI + .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() { + @Override + public CancelDelegationTokenResponse run() throws IOException, + YarnException { + return rm.getClientRMService().cancelDelegationToken(req); + } + }); + } catch (UndeclaredThrowableException ue) { + if (ue.getCause() instanceof YarnException) { + if (ue.getCause().getCause() instanceof InvalidToken) { + throw new BadRequestException(ue.getCause().getCause().getMessage()); + } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) { + return Response.status(Status.FORBIDDEN) + .entity(ue.getCause().getCause().getMessage()).build(); + } + LOG.info("Renew delegation token request failed", ue); + throw ue; + } + LOG.info("Renew delegation token request failed", ue); + throw ue; + } catch (Exception e) { + LOG.info("Renew delegation token request failed", e); + throw e; + } + + return Response.status(Status.OK).build(); + } + + private Token<RMDelegationTokenIdentifier> extractToken( + HttpServletRequest request) { + String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER); + if (encodedToken == null) { + String msg = + "Header '" + DELEGATION_TOKEN_HEADER + + "' containing encoded token not found"; + throw new BadRequestException(msg); + } + return extractToken(encodedToken); + } + + private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) { + Token<RMDelegationTokenIdentifier> token = + new Token<RMDelegationTokenIdentifier>(); + try { + token.decodeFromUrlString(encodedToken); + } catch (Exception ie) { + String msg = "Could not decode encoded token"; + throw new BadRequestException(msg); + } + return token; + } }
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Fri Jul 25 20:33:09 2014 @@ -232,20 +232,7 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - List<ContainerId> contsToClean = resp.getContainersToCleanup(); - int cleanedConts = contsToClean.size(); - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -258,26 +245,36 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts = contsToClean.size(); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); + waitForContainerCleanup(dispatcher, nm1, resp); + + rm.stop(); + } + + protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + NodeHeartbeatResponse resp) throws Exception { + int waitCount = 0, cleanedConts = 0; + List<ContainerId> contsToClean; + do { dispatcher.await(); contsToClean = resp.getContainersToCleanup(); cleanedConts += contsToClean.size(); + if (cleanedConts >= 1) { + break; + } + Thread.sleep(100); + resp = nm.nodeHeartbeat(true); + } while(waitCount++ < 200); + + if (contsToClean.isEmpty()) { + LOG.error("Failed to get any containers to cleanup"); + } else { + LOG.info("Got cleanup for " + contsToClean.get(0)); } - LOG.info("Got cleanup for " + contsToClean.get(0)); Assert.assertEquals(1, cleanedConts); - - rm.stop(); } - + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception { while (true) { @@ -400,6 +397,58 @@ public class TestApplicationCleanup { rm2.stop(); } + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws + Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // Add unknown container for application unknown to scheduler + NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 + .getApplicationAttemptId(), 2, ContainerState.RUNNING); + + waitForContainerCleanup(dispatcher2, nm1, response); + + rm1.stop(); + rm2.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Jul 25 20:33:09 2014 @@ -228,7 +228,7 @@ public class TestFifoScheduler { scheduler.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1"); + scheduler.addApplication(appId, "queue1", "user1", false); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); try { @@ -238,7 +238,7 @@ public class TestFifoScheduler { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false, true); + scheduler.addApplicationAttempt(attId, false, false); rm.stop(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Jul 25 20:33:09 2014 @@ -1250,10 +1250,11 @@ public class TestRMRestart { .getEncoded()); // assert AMRMTokenSecretManager also knows about the AMRMToken password - Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken(); - Assert.assertArrayEquals(amrmToken.getPassword(), - rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( - amrmToken.decodeIdentifier())); + // TODO: fix this on YARN-2211 +// Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken(); +// Assert.assertArrayEquals(amrmToken.getPassword(), +// rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( +// amrmToken.decodeIdentifier())); rm1.stop(); rm2.stop(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Fri Jul 25 20:33:09 2014 @@ -29,11 +29,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -164,6 +167,11 @@ public class TestWorkPreservingRMRestart // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + Set<ContainerId> launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); // check RMContainers are re-recreated and the container state is correct. rm2.waitForState(nm1, amContainer.getContainerId(), @@ -602,6 +610,36 @@ public class TestWorkPreservingRMRestart attempt0.getMasterContainer().getId()).isAMContainer()); } + @Test (timeout = 20000) + public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { + // start RM + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // scheduler app/attempt is immediately available after RM is re-started. + Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo( + am0.getApplicationAttemptId())); + + // getTransferredContainers should not throw NPE. + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getTransferredContainers(am0.getApplicationAttemptId()); + + List<NMContainerStatus> containers = createNMContainerStatusForApp(am0); + nm1.registerNode(containers, null); + waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); + } private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Fri Jul 25 20:33:09 2014 @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +35,6 @@ import java.util.Map; import javax.crypto.SecretKey; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; public class RMStateStoreTestBase extends ClientBaseWithFixes{ @@ -175,8 +176,11 @@ public class RMStateStoreTestBase extend TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); - AMRMTokenSecretManager appTokenMgr = - new AMRMTokenSecretManager(conf); + AMRMTokenSecretManager appTokenMgr = spy( + new AMRMTokenSecretManager(conf)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); @@ -455,10 +459,8 @@ public class RMStateStoreTestBase extend private Token<AMRMTokenIdentifier> generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { - AMRMTokenIdentifier appTokenId = - new AMRMTokenIdentifier(attemptId); Token<AMRMTokenIdentifier> appToken = - new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr); + appTokenMgr.createAndGetAMRMToken(attemptId); appToken.setService(new Text("appToken service")); return appToken; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Jul 25 20:33:09 2014 @@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); writer = mock(RMApplicationHistoryWriter.class); + MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey(); + when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, @@ -820,7 +823,9 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); - verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics()); + boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null); + verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(), + exitCode, shouldCheckURL); } @Test @@ -1238,11 +1243,18 @@ public class TestRMAppAttemptTransitions verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } - private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) { - assertTrue("Diagnostic information does not contain application proxy URL", - diagnostics.contains(applicationAttempt.getWebProxyBase())); + private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics, + int exitCode, boolean shouldCheckURL) { assertTrue("Diagnostic information does not point the logs to the users", diagnostics.contains("logs")); + assertTrue("Diagnostic information does not contain application attempt id", + diagnostics.contains(applicationAttempt.getAppAttemptId().toString())); + assertTrue("Diagnostic information does not contain application exit code", + diagnostics.contains("exitCode: " + exitCode)); + if (shouldCheckURL) { + assertTrue("Diagnostic information does not contain application proxy URL", + diagnostics.contains(applicationAttempt.getWebProxyBase())); + } } private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Fri Jul 25 20:33:09 2014 @@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -204,4 +214,36 @@ public class TestRMContainerImpl { assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); } + + @Test + public void testExistenceOfResourceRequestInRMContainer() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // Verify whether list of ResourceRequest is present in RMContainer + // while moving to ALLOCATED state + Assert.assertNotNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + + // Allocate container + am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()) + .getAllocatedContainers(); + rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED); + + // After RMContainer moving to ACQUIRED state, list of ResourceRequest will + // be empty + Assert.assertNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Jul 25 20:33:09 2014 @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -947,4 +951,67 @@ public class TestCapacityScheduler { rm1.stop(); } + + @Test(timeout = 30000) + public void testRecoverRequestAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); + ContainerId containerId1 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); + + RMContainer rmContainer = cs.getRMContainer(containerId1); + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + continue; + } + + // Already the node local resource request is cleared from RM after + // allocation. + Assert.assertNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + // Call killContainer to preempt the container + cs.killContainer(rmContainer); + + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + // Resource request must have added back in RM after preempt event + // handling. + Assert.assertEquals( + 1, + app.getResourceRequest(request.getPriority(), + request.getResourceName()).getNumContainers()); + } + + // New container will be allocated and will move to ALLOCATED state + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // allocate container + List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()).getAllocatedContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Fri Jul 25 20:33:09 2014 @@ -147,11 +147,11 @@ public class FairSchedulerTestBase { int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false, true); + scheduler.addApplicationAttempt(id, false, false); } List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, @@ -167,6 +167,27 @@ public class FairSchedulerTestBase { .put(id.getApplicationId(), rmApp); return id; } + + protected ApplicationAttemptId createSchedulingRequest(String queueId, + String userId, List<ResourceRequest> ask) { + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); + scheduler.addApplication(id.getApplicationId(), queueId, userId, false); + // This conditional is for testAclSubmitApplication where app is rejected + // and no app is added. + if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + scheduler.addApplicationAttempt(id, false, false); + } + scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null); + RMApp rmApp = mock(RMApp.class); + RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); + when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); + when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( + new RMAppAttemptMetrics(id)); + resourceManager.getRMContext().getRMApps() + .put(id.getApplicationId(), rmApp); + return id; + } protected void createSchedulingRequestExistingApplication( int memory, int priority, ApplicationAttemptId attId) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Jul 25 20:33:09 2014 @@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -788,14 +793,14 @@ public class TestFairScheduler extends F scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false, true); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false); + scheduler.addApplicationAttempt(id11, false, false); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false, true); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false); + scheduler.addApplicationAttempt(id21, false, false); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false, true); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false); + scheduler.addApplicationAttempt(id22, false, false); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1216,6 +1221,79 @@ public class TestFairScheduler extends F scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); } + @Test + public void testPreemptionIsNotDelayedToNextRound() throws Exception { + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>8</weight>"); + out.println("<queue name=\"queueA1\" />"); + out.println("<queue name=\"queueA2\" />"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>2</weight>"); + out.println("</queue>"); + out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node of 8G + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA1", "user1", 7, 1); + // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); + ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB", + "user2", 1, 1); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 8; i++) { + scheduler.handle(nodeUpdate1); + } + + // verify if the apps got the containers they requested + assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA2", "user3", 7, 1); + scheduler.update(); + + // Let 11 sec pass + clock.tick(11); + + scheduler.update(); + Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); + assertEquals(2980, toPreempt.getMemory()); + + // verify if the 3 containers required by queueA2 are preempted in the same + // round + scheduler.preemptResources(toPreempt); + assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() + .size()); + } + @Test (timeout = 5000) /** * Tests the timing of decision to preempt tasks. @@ -1556,8 +1634,8 @@ public class TestFairScheduler extends F scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false, true); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false); + scheduler.addApplicationAttempt(appId, false, false); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -1838,7 +1916,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, false); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -2715,8 +2793,8 @@ public class TestFairScheduler extends F // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false, true); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); + fs.addApplicationAttempt(appAttemptId, false, false); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); @@ -2758,7 +2836,43 @@ public class TestFairScheduler extends F Assert.assertEquals(2, nodes.size()); } - + @Test + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling once manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, scheduler.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); @@ -2831,6 +2945,87 @@ public class TestFairScheduler extends F } } } + + @Test(timeout=5000) + public void testRecoverRequestAfterPreemption() throws Exception { + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tick(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List<Container> containers = scheduler.allocate(appAttemptId, + Collections.<ResourceRequest> emptyList(), + Collections.<ContainerId> emptyList(), null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } @SuppressWarnings("resource") @Test Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java Fri Jul 25 20:33:09 2014 @@ -94,7 +94,7 @@ public class TestFairSchedulerPreemption scheduler = (FairScheduler)resourceManager.getResourceScheduler(); scheduler.setClock(clock); - scheduler.UPDATE_INTERVAL = 60 * 1000; + scheduler.updateInterval = 60 * 1000; } private void registerNodeAndSubmitApp( Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java Fri Jul 25 20:33:09 2014 @@ -23,13 +23,12 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; -import javax.crypto.SecretKey; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -41,7 +40,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -65,6 +67,8 @@ public class TestAMRMTokens { private final Configuration conf; private static final int maxWaitAttempts = 50; + private static final int rolling_interval_sec = 13; + private static final long am_expire_ms = 4000; @Parameters public static Collection<Object[]> configs() { @@ -201,15 +205,22 @@ public class TestAMRMTokens { @Test public void testMasterKeyRollOver() throws Exception { + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + rolling_interval_sec); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); MyContainerManager containerManager = new MyContainerManager(); final MockRMWithAMS rm = new MockRMWithAMS(conf, containerManager); rm.start(); - + Long startTime = System.currentTimeMillis(); final Configuration conf = rm.getConfig(); final YarnRPC rpc = YarnRPC.create(conf); ApplicationMasterProtocol rmClient = null; - + AMRMTokenSecretManager appTokenSecretManager = + rm.getRMContext().getAMRMTokenSecretManager(); + MasterKeyData oldKey = appTokenSecretManager.getMasterKey(); + Assert.assertNotNull(oldKey); try { MockNM nm1 = rm.registerNode("localhost:1234", 5120); @@ -218,7 +229,7 @@ public class TestAMRMTokens { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.containerTokens == null && waitCount++ < 20) { + while (containerManager.containerTokens == null && waitCount++ < maxWaitAttempts) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } @@ -250,21 +261,65 @@ public class TestAMRMTokens { Assert.assertTrue( rmClient.allocate(allocateRequest).getAMCommand() == null); - // Simulate a master-key-roll-over - AMRMTokenSecretManager appTokenSecretManager = - rm.getRMContext().getAMRMTokenSecretManager(); - SecretKey oldKey = appTokenSecretManager.getMasterKey(); - appTokenSecretManager.rollMasterKey(); - SecretKey newKey = appTokenSecretManager.getMasterKey(); + // Wait for enough time and make sure the roll_over happens + // At mean time, the old AMRMToken should continue to work + while(System.currentTimeMillis() - startTime < rolling_interval_sec*1000) { + rmClient.allocate(allocateRequest); + Thread.sleep(500); + } + + MasterKeyData newKey = appTokenSecretManager.getMasterKey(); + Assert.assertNotNull(newKey); Assert.assertFalse("Master key should have changed!", oldKey.equals(newKey)); + // Another allocate call with old AMRMToken. Should continue to work. + rpc.stopProxy(rmClient, conf); // To avoid using cached client + rmClient = createRMClient(rm, conf, rpc, currentUser); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + + waitCount = 0; + while(waitCount++ <= maxWaitAttempts) { + if (appTokenSecretManager.getCurrnetMasterKeyData() != oldKey) { + break; + } + try { + rmClient.allocate(allocateRequest); + } catch (Exception ex) { + break; + } + Thread.sleep(200); + } + // active the nextMasterKey, and replace the currentMasterKey + Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey)); + Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey)); + Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null); + + // Create a new Token + Token<AMRMTokenIdentifier> newToken = + appTokenSecretManager.createAndGetAMRMToken(applicationAttemptId); + SecurityUtil.setTokenService(newToken, rmBindAddress); + currentUser.addToken(newToken); // Another allocate call. Should continue to work. rpc.stopProxy(rmClient, conf); // To avoid using cached client rmClient = createRMClient(rm, conf, rpc, currentUser); allocateRequest = Records.newRecord(AllocateRequest.class); - Assert.assertTrue( - rmClient.allocate(allocateRequest).getAMCommand() == null); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + + // Should not work by using the old AMRMToken. + rpc.stopProxy(rmClient, conf); // To avoid using cached client + try { + currentUser.addToken(amRMToken); + rmClient = createRMClient(rm, conf, rpc, currentUser); + allocateRequest = Records.newRecord(AllocateRequest.class); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + Assert.fail("The old Token should not work"); + } catch (Exception ex) { + // expect exception + } } finally { rm.stop(); if (rmClient != null) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Fri Jul 25 20:33:09 2014 @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -673,7 +674,40 @@ public class TestDelegationTokenRenewer Thread.sleep(200); } } - + + @Test(timeout=20000) + public void testDTRonAppSubmission() + throws IOException, InterruptedException, BrokenBarrierException { + final Credentials credsx = new Credentials(); + final Token<?> tokenx = mock(Token.class); + credsx.addToken(new Text("token"), tokenx); + doReturn(true).when(tokenx).isManaged(); + doThrow(new IOException("boom")) + .when(tokenx).renew(any(Configuration.class)); + // fire up the renewer + final DelegationTokenRenewer dtr = + createNewDelegationTokenRenewer(conf, counter); + RMContext mockContext = mock(RMContext.class); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + InetSocketAddress sockAddr = + InetSocketAddress.createUnresolved("localhost", 1234); + when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + dtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.init(conf); + dtr.start(); + + try { + dtr.addApplicationSync(mock(ApplicationId.class), credsx, false); + fail("Catch IOException on app submission"); + } catch (IOException e){ + Assert.assertTrue(e.getMessage().contains(tokenx.toString())); + Assert.assertTrue(e.getCause().toString().contains("boom")); + } + + } + @Test(timeout=20000) public void testConcurrentAddApplication() throws IOException, InterruptedException, BrokenBarrierException { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Fri Jul 25 20:33:09 2014 @@ -205,6 +205,12 @@ Properties that can be placed in yarn-si instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. + * <<<yarn.scheduler.fair.update-interval-ms>>> + + * The interval at which to lock the scheduler and recalculate fair shares, + recalculate demand, and check whether anything is due for preemption. + Defaults to 500 ms. + Allocation file format The allocation file must be in XML format. The format contains five types of
