Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Tue Aug 19 23:49:39 2014 @@ -18,26 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -45,13 +53,29 @@ import org.apache.hadoop.yarn.server.uti import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestApplicationCleanup { private static final Log LOG = LogFactory .getLog(TestApplicationCleanup.class); + + private YarnConfiguration conf; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + } + @SuppressWarnings("resource") @Test public void testAppCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -130,6 +154,7 @@ public class TestApplicationCleanup { rm.stop(); } + @SuppressWarnings("resource") @Test public void testContainerCleanup() throws Exception { @@ -207,20 +232,7 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - List<ContainerId> contsToClean = resp.getContainersToCleanup(); - int cleanedConts = contsToClean.size(); - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -233,24 +245,237 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts = contsToClean.size(); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); + waitForContainerCleanup(dispatcher, nm1, resp); + + rm.stop(); + } + + protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + NodeHeartbeatResponse resp) throws Exception { + int waitCount = 0, cleanedConts = 0; + List<ContainerId> contsToClean; + do { dispatcher.await(); contsToClean = resp.getContainersToCleanup(); cleanedConts += contsToClean.size(); + if (cleanedConts >= 1) { + break; + } + Thread.sleep(100); + resp = nm.nodeHeartbeat(true); + } while(waitCount++ < 200); + + if (contsToClean.isEmpty()) { + LOG.error("Failed to get any containers to cleanup"); + } else { + LOG.info("Got cleanup for " + contsToClean.get(0)); } - LOG.info("Got cleanup for " + contsToClean.get(0)); Assert.assertEquals(1, cleanedConts); + } - rm.stop(); + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) + throws Exception { + while (true) { + NodeHeartbeatResponse response = nm.nodeHeartbeat(true); + if (response.getApplicationsToCleanup() != null + && response.getApplicationsToCleanup().size() == 1 + && appId.equals(response.getApplicationsToCleanup().get(0))) { + return; + } + + LOG.info("Haven't got application=" + appId.toString() + + " in cleanup list from node heartbeat response, " + + "sleep for a while before next heartbeat"); + Thread.sleep(1000); + } + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); + rm2.stop(); + } + + @SuppressWarnings("resource") + @Test(timeout = 60000) + public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService()); + nm2.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // alloc another container on nm2 + AllocateResponse allocResponse = + am0.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resource.newInstance(1024, 0), 1)), + null); + while (null == allocResponse.getAllocatedContainers() + || allocResponse.getAllocatedContainers().isEmpty()) { + nm2.nodeHeartbeat(true); + allocResponse = am0.allocate(null, null); + Thread.sleep(1000); + } + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1/nm2 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance( + ContainerId.newInstance(am0.getApplicationAttemptId(), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0, + Priority.newInstance(0), 1234)), Arrays.asList(app0.getApplicationId())); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.registerNode(Arrays.asList(app0.getApplicationId())); + + // assert app state has been saved. + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received on NM1 + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + // wait for application cleanup message received on NM2 + waitForAppCleanupMessageRecved(nm2, app0.getApplicationId()); + + rm1.stop(); + rm2.stop(); + } + + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws + Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // Add unknown container for application unknown to scheduler + NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 + .getApplicationAttemptId(), 2, ContainerState.RUNNING); + + waitForContainerCleanup(dispatcher2, nm1, response); + + rm1.stop(); + rm2.stop(); + } + + @Test (timeout = 60000) + public void testAppCleanupWhenNMReconnects() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + // reconnect NM with application still active + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); } public static void main(String[] args) throws Exception {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Tue Aug 19 23:49:39 2014 @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public class TestApplicationMasterLaunch // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList<ContainerId>()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList<ContainerId>()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public class TestApplicationMasterLaunch thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Tue Aug 19 23:49:39 2014 @@ -18,60 +18,33 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.collect.Maps; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentMap; import static java.lang.Thread.sleep; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.*; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -198,7 +171,6 @@ public class TestApplicationMasterServic RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.setAMRMProtocol(rm.getApplicationMasterService()); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List<ContainerId> release = new ArrayList<ContainerId>(); @@ -270,13 +242,17 @@ public class TestApplicationMasterServic } Assert.assertNotNull(cause); Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); + .assertTrue(cause instanceof ApplicationMasterNotRegisteredException); Assert.assertNotNull(cause.getMessage()); Assert .assertTrue(cause .getMessage() .contains( "Application Master is trying to unregister before registering for:")); + + am1.registerAppAttempt(); + + am1.unregisterAppAttempt(req, false); } finally { if (rm != null) { rm.stop(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Tue Aug 19 23:49:39 2014 @@ -44,16 +44,17 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.CyclicBarrier; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -137,6 +139,10 @@ public class TestClientRMService { private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; + private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT"; + static { + KerberosName.setRules(kerberosRule); + } @BeforeClass public static void setupSecretManager() throws IOException { @@ -259,6 +265,28 @@ public class TestClientRMService { } @Test + public void testGetApplicationResourceUsageReportDummy() throws YarnException, + IOException { + ApplicationAttemptId attemptId = getApplicationAttemptId(1); + YarnScheduler yarnScheduler = mockYarnScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler<Event>() { + public void handle(Event event) { + } + }); + ApplicationSubmissionContext asContext = + mock(ApplicationSubmissionContext.class); + YarnConfiguration config = new YarnConfiguration(); + RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, + rmContext, yarnScheduler, null, asContext, config, false); + ApplicationResourceUsageReport report = rmAppAttemptImpl + .getApplicationResourceUsageReport(); + assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); + } + + @Test public void testGetApplicationAttempts() throws YarnException, IOException { ClientRMService rmService = createRMService(); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -456,6 +484,17 @@ public class TestClientRMService { UserGroupInformation.createRemoteUser("owner"); private static final UserGroupInformation other = UserGroupInformation.createRemoteUser("other"); + private static final UserGroupInformation tester = + UserGroupInformation.createRemoteUser("tester"); + private static final String testerPrincipal = "[email protected]"; + private static final String ownerPrincipal = "[email protected]"; + private static final String otherPrincipal = "[email protected]"; + private static final UserGroupInformation testerKerb = + UserGroupInformation.createRemoteUser(testerPrincipal); + private static final UserGroupInformation ownerKerb = + UserGroupInformation.createRemoteUser(ownerPrincipal); + private static final UserGroupInformation otherKerb = + UserGroupInformation.createRemoteUser(otherPrincipal); @Test public void testTokenRenewalByOwner() throws Exception { @@ -478,9 +517,8 @@ public class TestClientRMService { checkTokenRenewal(owner, other); return null; } catch (YarnException ex) { - Assert.assertTrue(ex.getMessage().contains( - "Client " + owner.getUserName() + - " tries to renew a token with renewer specified as " + + Assert.assertTrue(ex.getMessage().contains(owner.getUserName() + + " tries to renew a token with renewer " + other.getUserName())); throw ex; } @@ -524,6 +562,147 @@ public class TestClientRMService { rmService.renewDelegationToken(request); } + @Test + public void testTokenCancellationByOwner() throws Exception { + // two tests required - one with a kerberos name + // and with a short name + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = + new ClientRMService(rmContext, null, null, null, null, dtsm); + testerKerb.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + checkTokenCancellation(rmService, testerKerb, other); + return null; + } + }); + owner.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + checkTokenCancellation(owner, other); + return null; + } + }); + } + + @Test + public void testTokenCancellationByRenewer() throws Exception { + // two tests required - one with a kerberos name + // and with a short name + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = + new ClientRMService(rmContext, null, null, null, null, dtsm); + testerKerb.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + checkTokenCancellation(rmService, owner, testerKerb); + return null; + } + }); + other.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + checkTokenCancellation(owner, other); + return null; + } + }); + } + + @Test + public void testTokenCancellationByWrongUser() { + // two sets to test - + // 1. try to cancel tokens of short and kerberos users as a kerberos UGI + // 2. try to cancel tokens of short and kerberos users as a simple auth UGI + + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = + new ClientRMService(rmContext, null, null, null, null, dtsm); + UserGroupInformation[] kerbTestOwners = + { owner, other, tester, ownerKerb, otherKerb }; + UserGroupInformation[] kerbTestRenewers = + { owner, other, ownerKerb, otherKerb }; + for (final UserGroupInformation tokOwner : kerbTestOwners) { + for (final UserGroupInformation tokRenewer : kerbTestRenewers) { + try { + testerKerb.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + checkTokenCancellation(rmService, tokOwner, tokRenewer); + Assert.fail("We should not reach here; token owner = " + + tokOwner.getUserName() + ", renewer = " + + tokRenewer.getUserName()); + return null; + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + testerKerb.getUserName() + + " is not authorized to cancel the token")); + return null; + } + } + }); + } catch (Exception e) { + Assert.fail("Unexpected exception; " + e.getMessage()); + } + } + } + + UserGroupInformation[] simpleTestOwners = + { owner, other, ownerKerb, otherKerb, testerKerb }; + UserGroupInformation[] simpleTestRenewers = + { owner, other, ownerKerb, otherKerb }; + for (final UserGroupInformation tokOwner : simpleTestOwners) { + for (final UserGroupInformation tokRenewer : simpleTestRenewers) { + try { + tester.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + checkTokenCancellation(tokOwner, tokRenewer); + Assert.fail("We should not reach here; token owner = " + + tokOwner.getUserName() + ", renewer = " + + tokRenewer.getUserName()); + return null; + } catch (YarnException ex) { + Assert.assertTrue(ex.getMessage().contains( + tester.getUserName() + + " is not authorized to cancel the token")); + return null; + } + } + }); + } catch (Exception e) { + Assert.fail("Unexpected exception; " + e.getMessage()); + } + } + } + } + + private void checkTokenCancellation(UserGroupInformation owner, + UserGroupInformation renewer) throws IOException, YarnException { + RMContext rmContext = mock(RMContext.class); + final ClientRMService rmService = + new ClientRMService(rmContext, null, null, null, null, dtsm); + checkTokenCancellation(rmService, owner, renewer); + } + + private void checkTokenCancellation(ClientRMService rmService, + UserGroupInformation owner, UserGroupInformation renewer) + throws IOException, YarnException { + RMDelegationTokenIdentifier tokenIdentifier = + new RMDelegationTokenIdentifier(new Text(owner.getUserName()), + new Text(renewer.getUserName()), null); + Token<?> token = + new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm); + org.apache.hadoop.yarn.api.records.Token dToken = + BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() + .toString(), token.getPassword(), token.getService().toString()); + CancelDelegationTokenRequest request = + Records.newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(dToken); + rmService.cancelDelegationToken(request); + } + @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { @@ -647,7 +826,8 @@ public class TestClientRMService { ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3"); - + + long[] submitTimeMillis = new long[3]; // Submit applications for (int i = 0; i < appIds.length; i++) { ApplicationId appId = appIds[i]; @@ -657,6 +837,7 @@ public class TestClientRMService { appId, appNames[i], queues[i % queues.length], new HashSet<String>(tags.subList(0, i + 1))); rmService.submitApplication(submitRequest); + submitTimeMillis[i] = System.currentTimeMillis(); } // Test different cases of ClientRMService#getApplications() @@ -668,6 +849,24 @@ public class TestClientRMService { request.setLimit(1L); assertEquals("Failed to limit applications", 1, rmService.getApplications(request).getApplicationList().size()); + + // Check start range + request = GetApplicationsRequest.newInstance(); + request.setStartRange(submitTimeMillis[0], System.currentTimeMillis()); + + // 2 applications are submitted after first timeMills + assertEquals("Incorrect number of matching start range", + 2, rmService.getApplications(request).getApplicationList().size()); + + // 1 application is submitted after the second timeMills + request.setStartRange(submitTimeMillis[1], System.currentTimeMillis()); + assertEquals("Incorrect number of matching start range", + 1, rmService.getApplications(request).getApplicationList().size()); + + // no application is submitted after the third timeMills + request.setStartRange(submitTimeMillis[2], System.currentTimeMillis()); + assertEquals("Incorrect number of matching start range", + 0, rmService.getApplications(request).getApplicationList().size()); // Check queue request = GetApplicationsRequest.newInstance(); @@ -945,6 +1144,8 @@ public class TestClientRMService { Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn( Arrays.asList(getApplicationAttemptId(103))); + ApplicationAttemptId attemptId = getApplicationAttemptId(1); + when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); return yarnScheduler; } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Tue Aug 19 23:49:39 2014 @@ -36,6 +36,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import org.apache.hadoop.net.NetUtils; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -235,8 +236,8 @@ public class TestClientRMTokens { @Test public void testShortCircuitRenewCancel() throws IOException, InterruptedException { - InetSocketAddress addr = - new InetSocketAddress(InetAddress.getLocalHost(), 123); + InetSocketAddress addr = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), 123, null); checkShortCircuitRenewCancel(addr, addr, true); } @@ -244,17 +245,19 @@ public class TestClientRMTokens { public void testShortCircuitRenewCancelWildcardAddress() throws IOException, InterruptedException { InetSocketAddress rmAddr = new InetSocketAddress(123); + InetSocketAddress serviceAddr = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), rmAddr.getPort(), null); checkShortCircuitRenewCancel( rmAddr, - new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()), + serviceAddr, true); } @Test public void testShortCircuitRenewCancelSameHostDifferentPort() throws IOException, InterruptedException { - InetSocketAddress rmAddr = - new InetSocketAddress(InetAddress.getLocalHost(), 123); + InetSocketAddress rmAddr = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), 123, null); checkShortCircuitRenewCancel( rmAddr, new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1), @@ -264,8 +267,8 @@ public class TestClientRMTokens { @Test public void testShortCircuitRenewCancelDifferentHostSamePort() throws IOException, InterruptedException { - InetSocketAddress rmAddr = - new InetSocketAddress(InetAddress.getLocalHost(), 123); + InetSocketAddress rmAddr = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), 123, null); checkShortCircuitRenewCancel( rmAddr, new InetSocketAddress("1.1.1.1", rmAddr.getPort()), @@ -275,8 +278,8 @@ public class TestClientRMTokens { @Test public void testShortCircuitRenewCancelDifferentHostDifferentPort() throws IOException, InterruptedException { - InetSocketAddress rmAddr = - new InetSocketAddress(InetAddress.getLocalHost(), 123); + InetSocketAddress rmAddr = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), 123, null); checkShortCircuitRenewCancel( rmAddr, new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1), Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Tue Aug 19 23:49:39 2014 @@ -18,16 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -52,21 +53,21 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class TestFifoScheduler { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); private final int GB = 1024; private static YarnConfiguration conf; - + @BeforeClass public static void setup() { conf = new YarnConfiguration(); @@ -76,12 +77,12 @@ public class TestFifoScheduler { @Test (timeout = 30000) public void testConfValidation() throws Exception { - ResourceScheduler scheduler = new FifoScheduler(); + FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -213,6 +214,35 @@ public class TestFifoScheduler { rm.stop(); } + @Test + public void testNodeUpdateBeforeAppAttemptInit() throws Exception { + FifoScheduler scheduler = new FifoScheduler(); + MockRM rm = new MockRM(conf); + scheduler.setRMContext(rm.getRMContext()); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rm.getRMContext()); + + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(1024, 4), 1, "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + ApplicationId appId = ApplicationId.newInstance(0, 1); + scheduler.addApplication(appId, "queue1", "user1", false); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + try { + scheduler.handle(updateEvent); + } catch (NullPointerException e) { + Assert.fail(); + } + + ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); + scheduler.addApplicationAttempt(attId, false, false); + + rm.stop(); + } + private void testMinimumAllocation(YarnConfiguration conf, int testAlloc) throws Exception { MockRM rm = new MockRM(conf); @@ -266,7 +296,12 @@ public class TestFifoScheduler { conf.setQueues("default", new String[] {"default"}); conf.setCapacity("default", 100); FifoScheduler fs = new FifoScheduler(); + fs.init(conf); + fs.start(); + // mock rmContext to avoid NPE. + RMContext context = mock(RMContext.class); fs.reinitialize(conf, null); + fs.setRMContext(context); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); @@ -286,6 +321,7 @@ public class TestFifoScheduler { fs.handle(new NodeUpdateSchedulerEvent(n1)); Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); + fs.stop(); } @Test (timeout = 50000) Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Tue Aug 19 23:49:39 2014 @@ -43,10 +43,11 @@ import org.junit.Test; public class TestMoveApplication { private ResourceManager resourceManager = null; private static boolean failMove; - + private Configuration conf; + @Before public void setUp() throws Exception { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class, FifoSchedulerWithMove.class); conf.set(YarnConfiguration.YARN_ADMIN_ACL, " "); @@ -119,28 +120,23 @@ public class TestMoveApplication { } } - @Test (timeout = 5000) - public void testMoveSuccessful() throws Exception { - // Submit application - Application application = new Application("user1", resourceManager); - ApplicationId appId = application.getApplicationId(); - application.submit(); - - // Wait for app to be accepted - RMApp app = resourceManager.rmContext.getRMApps().get(appId); - while (app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(100); - } - - ClientRMService clientRMService = resourceManager.getClientRMService(); + @Test (timeout = 10000) + public + void testMoveSuccessful() throws Exception { + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app = rm1.submitApp(1024); + ClientRMService clientRMService = rm1.getClientRMService(); // FIFO scheduler does not support moves - clientRMService.moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); - - RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + clientRMService + .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest + .newInstance(app.getApplicationId(), "newqueue")); + + RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId()); assertEquals("newqueue", rmApp.getQueue()); + rm1.stop(); } - + @Test public void testMoveRejectedByPermissions() throws Exception { failMove = true; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Tue Aug 19 23:49:39 2014 @@ -28,6 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.junit.After; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -78,7 +81,14 @@ public class TestRM { // Milliseconds to sleep for when waiting for something to happen private final static int WAIT_SLEEP_MS = 100; - + + @After + public void tearDown() { + ClusterMetrics.destroy(); + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + @Test public void testGetNewAppId() throws Exception { Logger rootLogger = LogManager.getRootLogger(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java Tue Aug 19 23:49:39 2014 @@ -348,14 +348,14 @@ public class TestRMAdminService { rm.adminService.refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest.newInstance()); - Assert.assertTrue(ProxyUsers.getProxyGroups() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() .get("hadoop.proxyuser.test.groups").size() == 1); - Assert.assertTrue(ProxyUsers.getProxyGroups() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() .get("hadoop.proxyuser.test.groups").contains("test_groups")); - Assert.assertTrue(ProxyUsers.getProxyHosts() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() .get("hadoop.proxyuser.test.hosts").size() == 1); - Assert.assertTrue(ProxyUsers.getProxyHosts() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() .get("hadoop.proxyuser.test.hosts").contains("test_hosts")); } @@ -708,14 +708,14 @@ public class TestRMAdminService { aclsString); // verify ProxyUsers and ProxyHosts - Assert.assertTrue(ProxyUsers.getProxyGroups() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() .get("hadoop.proxyuser.test.groups").size() == 1); - Assert.assertTrue(ProxyUsers.getProxyGroups() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() .get("hadoop.proxyuser.test.groups").contains("test_groups")); - Assert.assertTrue(ProxyUsers.getProxyHosts() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() .get("hadoop.proxyuser.test.hosts").size() == 1); - Assert.assertTrue(ProxyUsers.getProxyHosts() + Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() .get("hadoop.proxyuser.test.hosts").contains("test_hosts")); // verify UserToGroupsMappings Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Tue Aug 19 23:49:39 2014 @@ -28,7 +28,7 @@ import java.net.InetSocketAddress; import javax.ws.rs.core.MediaType; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +37,8 @@ import org.apache.hadoop.ha.HAServicePro import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.service.AbstractService; @@ -92,6 +94,9 @@ public class TestRMHA { // Enable webapp to test web-services also configuration.setBoolean(MockRM.ENABLE_WEBAPP, true); configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + ClusterMetrics.destroy(); + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); } private void checkMonitorHealth() throws IOException { @@ -326,6 +331,10 @@ public class TestRMHA { rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToStandby(requestInfo); + + MyCountingDispatcher dispatcher = + (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + assertTrue(!dispatcher.isStopped()); rm.adminService.transitionToActive(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, @@ -334,6 +343,11 @@ public class TestRMHA { assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + + // Keep the dispatcher reference before transitioning to standby + dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + + rm.adminService.transitionToStandby(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) @@ -341,6 +355,8 @@ public class TestRMHA { assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + assertTrue(dispatcher.isStopped()); + rm.stop(); } @@ -375,7 +391,19 @@ public class TestRMHA { } @Test - public void testHAWithRMHostName() { + public void testHAWithRMHostName() throws Exception { + innerTestHAWithRMHostName(false); + configuration.clear(); + setUp(); + innerTestHAWithRMHostName(true); + } + + public void innerTestHAWithRMHostName(boolean includeBindHost) { + //this is run two times, with and without a bind host configured + if (includeBindHost) { + configuration.set(YarnConfiguration.RM_BIND_HOST, "9.9.9.9"); + } + //test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set //We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, @@ -395,6 +423,15 @@ public class TestRMHA { RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID))); assertEquals("RPC address not set for " + confKey, RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID))); + if (includeBindHost) { + assertEquals("Web address misconfigured WITH bind-host", + rm.webAppAddress.substring(0, 7), "9.9.9.9"); + } else { + //YarnConfiguration tries to figure out which rm host it's on by binding to it, + //which doesn't happen for any of these fake addresses, so we end up with 0.0.0.0 + assertEquals("Web address misconfigured WITHOUT bind-host", + rm.webAppAddress.substring(0, 7), "0.0.0.0"); + } } } catch (YarnRuntimeException e) { fail("Should not throw any exceptions."); @@ -466,6 +503,8 @@ public class TestRMHA { private int eventHandlerCount; + private volatile boolean stopped = false; + public MyCountingDispatcher() { super("MyCountingDispatcher"); this.eventHandlerCount = 0; @@ -484,5 +523,15 @@ public class TestRMHA { public int getEventHandlerCount() { return this.eventHandlerCount; } + + @Override + protected void serviceStop() throws Exception { + this.stopped = true; + super.serviceStop(); + } + + public boolean isStopped() { + return this.stopped; + } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Tue Aug 19 23:49:39 2014 @@ -21,15 +21,14 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -160,7 +161,7 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -188,11 +189,11 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node2.handle(new RMNodeStartedEvent(null, null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -248,7 +249,7 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -455,12 +456,16 @@ public class TestRMNodeTransitions { } private RMNodeImpl getRunningNode() { + return getRunningNode(null); + } + + private RMNodeImpl getRunningNode(String nmVersion) { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -491,7 +496,7 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -515,7 +520,7 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeReconnectEvent(node.getNodeID(), node)); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null)); Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -530,4 +535,15 @@ public class TestRMNodeTransitions { nodesListManagerEvent.getType()); } + @Test + public void testReconnnectUpdate() { + final String nmVersion1 = "nm version 1"; + final String nmVersion2 = "nm version 2"; + RMNodeImpl node = getRunningNode(nmVersion1); + Assert.assertEquals(nmVersion1, node.getNodeManagerVersion()); + RMNodeImpl reconnectingNode = getRunningNode(nmVersion2); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode, + null)); + Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Aug 19 23:49:39 2014 @@ -30,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -67,13 +68,15 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -82,8 +85,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -100,7 +103,6 @@ import org.junit.Before; import org.junit.Test; public class TestRMRestart { - private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -287,11 +289,11 @@ public class TestRMRestart { // verify old AM is not accepted // change running AM to talk to new RM - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); AllocateResponse allocResponse = am1.allocate( new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -303,13 +305,11 @@ public class TestRMRestart { nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); - List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + NMContainerStatus status = + TestRMRestart + .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status), null); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -392,7 +392,7 @@ public class TestRMRestart { // completed apps are not removed immediately after app finish // And finished app is also loaded back. Assert.assertEquals(4, rmAppState.size()); - } + } @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { @@ -510,14 +510,11 @@ public class TestRMRestart { Assert.assertEquals(RMAppAttemptState.LAUNCHED, rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) .getAppAttemptState()); - - List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus( - BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + NMContainerStatus status = + TestRMRestart.createNMContainerStatus( + am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status), null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -615,7 +612,7 @@ public class TestRMRestart { @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { if (count == 0) { // do nothing; simulate app final state is not saved. LOG.info(appId + " final state is not saved."); @@ -763,14 +760,14 @@ public class TestRMRestart { @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // ignore attempt saving request. } @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // ignore attempt saving request. } }; @@ -1211,18 +1208,13 @@ public class TestRMRestart { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); - // the appToken and clientTokenMasterKey that are generated when + // the clientTokenMasterKey that are generated when // RMAppAttempt is created, - HashSet<Token<?>> tokenSet = new HashSet<Token<?>>(); - tokenSet.add(attempt1.getAMRMToken()); byte[] clientTokenMasterKey = attempt1.getClientTokenMasterKey().getEncoded(); // assert application credentials are saved Credentials savedCredentials = attemptState.getAppAttemptCredentials(); - HashSet<Token<?>> savedTokens = new HashSet<Token<?>>(); - savedTokens.addAll(savedCredentials.getAllTokens()); - Assert.assertEquals(tokenSet, savedTokens); Assert.assertArrayEquals("client token master key not saved", clientTokenMasterKey, savedCredentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); @@ -1235,11 +1227,8 @@ public class TestRMRestart { rm2.getRMContext().getRMApps().get(app1.getApplicationId()); RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1); - // assert loaded attempt recovered attempt tokens + // assert loaded attempt recovered Assert.assertNotNull(loadedAttempt1); - savedTokens.clear(); - savedTokens.add(loadedAttempt1.getAMRMToken()); - Assert.assertEquals(tokenSet, savedTokens); // assert client token master key is recovered back to api-versioned // client token master key @@ -1638,14 +1627,20 @@ public class TestRMRestart { // create app that gets launched and does allocate before RM restart RMApp app1 = rm1.submitApp(200); - assertQueueMetrics(qm1, 1, 1, 0, 0); - nm1.nodeHeartbeat(true); + // Need to wait first for AppAttempt to be started (RMAppState.ACCEPTED) + // and then for it to reach RMAppAttemptState.SCHEDULED + // inorder to ensure appsPending metric is incremented + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED); + assertQueueMetrics(qm1, 1, 1, 0, 0); + + nm1.nodeHeartbeat(true); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); nm1.nodeHeartbeat(true); List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); @@ -1660,24 +1655,25 @@ public class TestRMRestart { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state MockRM rm2 = new MockRM(conf, memStore); - rm2.start(); - nm1.setResourceTrackerService(rm2.getResourceTrackerService()); QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics(); resetQueueMetrics(qm2); assertQueueMetrics(qm2, 0, 0, 0, 0); + + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); - List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + NMContainerStatus status = + TestRMRestart + .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status), null); + while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1801,12 +1797,10 @@ public class TestRMRestart { // ResourceTrackerService is started. super.serviceStart(); nm1.setResourceTrackerService(getResourceTrackerService()); - List<ContainerStatus> status = new ArrayList<ContainerStatus>(); - ContainerId amContainer = - ContainerId.newInstance(am0.getApplicationAttemptId(), 1); - status.add(ContainerStatus.newInstance(amContainer, - ContainerState.COMPLETE, "AM container exit", 143)); - nm1.registerNode(status); + NMContainerStatus status = + TestRMRestart.createNMContainerStatus( + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(status), null); } }; } @@ -1845,6 +1839,16 @@ public class TestRMRestart { } } + public static NMContainerStatus createNMContainerStatus( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, id); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "recover container", 0, + Priority.newInstance(0), 0); + return containerReport; + } + public class TestMemoryRMStateStore extends MemoryRMStateStore { int count = 0; public int updateApp = 0; @@ -1852,7 +1856,7 @@ public class TestRMRestart { @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { updateApp = ++count; super.updateApplicationStateInternal(appId, appStateData); } @@ -1861,7 +1865,7 @@ public class TestRMRestart { public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { updateAttempt = ++count; super.updateApplicationAttemptStateInternal(attemptId, Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Tue Aug 19 23:49:39 2014 @@ -27,7 +27,10 @@ import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AuthenticationFilterInitializer; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,8 +42,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -235,4 +240,75 @@ public class TestResourceManager { } } + @Test(timeout = 50000) + public void testFilterOverrides() throws Exception { + String filterInitializerConfKey = "hadoop.http.filter.initializers"; + String[] filterInitializers = + { + AuthenticationFilterInitializer.class.getName(), + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + "," + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + this.getClass().getName() }; + for (String filterInitializer : filterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + conf.set("hadoop.security.authentication", "kerberos"); + conf.set("hadoop.http.authentication.type", "kerberos"); + try { + try { + UserGroupInformation.setConfiguration(conf); + } catch (Exception e) { + // ignore we just care about getting true for + // isSecurityEnabled() + LOG.info("Got expected exception"); + } + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.contains(this.getClass().getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + this.getClass().getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + + // simple mode overrides + String[] simpleFilterInitializers = + { "", StaticUserWebFilter.class.getName() }; + for (String filterInitializer : simpleFilterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + try { + UserGroupInformation.setConfiguration(conf); + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.equals(StaticUserWebFilter.class.getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + StaticUserWebFilter.class.getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + } + }
