Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Tue Aug 19 23:49:39 2014 @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,18 +36,19 @@ import org.apache.hadoop.metrics2.lib.De import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; @@ -56,17 +60,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; - import org.junit.After; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - public class TestResourceTrackerService { private final static File TEMP_DIR = new File(System.getProperty( @@ -487,33 +484,37 @@ public class TestResourceTrackerService RMApp app = rm.submitApp(1024, true); // Case 1.1: AppAttemptId is null - ContainerStatus status = ContainerStatus.newInstance( - ContainerId.newInstance(ApplicationAttemptId.newInstance( - app.getApplicationId(), 2), 1), - ContainerState.COMPLETE, "Dummy Completed", 0); - rm.getResourceTrackerService().handleContainerStatus(status); + NMContainerStatus report = + NMContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0, Priority.newInstance(10), 1234); + rm.getResourceTrackerService().handleNMContainerStatus(report); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null RMAppAttemptImpl currentAttempt = (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); - status = ContainerStatus.newInstance( - ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), - ContainerState.COMPLETE, "Dummy Completed", 0); - rm.getResourceTrackerService().handleContainerStatus(status); + report = NMContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0, Priority.newInstance(10), 1234); + rm.getResourceTrackerService().handleNMContainerStatus(report); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM app = rm.submitApp(1024); // Case 2.1: AppAttemptId is null - status = ContainerStatus.newInstance( - ContainerId.newInstance(ApplicationAttemptId.newInstance( - app.getApplicationId(), 2), 1), - ContainerState.COMPLETE, "Dummy Completed", 0); + report = NMContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleContainerStatus(status); + rm.getResourceTrackerService().handleNMContainerStatus(report); } catch (Exception e) { // expected - ignore } @@ -523,11 +524,12 @@ public class TestResourceTrackerService currentAttempt = (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); - status = ContainerStatus.newInstance( - ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), - ContainerState.COMPLETE, "Dummy Completed", 0); + report = NMContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), + "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleContainerStatus(status); + rm.getResourceTrackerService().handleNMContainerStatus(report); } catch (Exception e) { // expected - ignore } @@ -593,7 +595,7 @@ public class TestResourceTrackerService // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); dispatcher.await(); - response = nm2.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014 @@ -165,7 +165,7 @@ public class TestRMApplicationHistoryWri when(container.getAllocatedResource()).thenReturn( Resource.newInstance(-1, -1)); when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); - when(container.getStartTime()).thenReturn(0L); + when(container.getCreationTime()).thenReturn(0L); when(container.getFinishTime()).thenReturn(1L); when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); when(container.getLogURL()).thenReturn("test log url"); @@ -281,7 +281,7 @@ public class TestRMApplicationHistoryWri Assert.assertEquals(Resource.newInstance(-1, -1), containerHD.getAllocatedResource()); Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority()); - Assert.assertEquals(0L, container.getStartTime()); + Assert.assertEquals(0L, container.getCreationTime()); writer.containerFinished(container); for (int i = 0; i < MAX_RETRIES; ++i) { @@ -420,7 +420,7 @@ public class TestRMApplicationHistoryWri int waitCount = 0; int allocatedSize = allocated.size(); while (allocatedSize < request && waitCount++ < 200) { - Thread.sleep(100); + Thread.sleep(300); allocated = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Tue Aug 19 23:49:39 2014 @@ -31,10 +31,13 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -159,6 +162,16 @@ public abstract class MockAsm extends Mo public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Set<NodeId> getRanNodes() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public RMAppMetrics getRMAppMetrics() { + return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0); + } } public static RMApp newApplication(int i) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Tue Aug 19 23:49:39 2014 @@ -19,41 +19,47 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; - -import org.junit.Assert; +import java.util.Map; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; import org.junit.Test; -/** - * Test to restart the AM on failure. - * - */ public class TestAMRestart { - @Test + @Test(timeout = 30000) public void testAMRestartWithExistingContainers() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); @@ -123,9 +129,9 @@ public class TestAMRestart { ContainerId.newInstance(am1.getApplicationAttemptId(), 6); nm1.nodeHeartbeat(true); SchedulerApplicationAttempt schedulerAttempt = - ((CapacityScheduler) rm1.getResourceScheduler()) + ((AbstractYarnScheduler) rm1.getResourceScheduler()) .getCurrentAttemptForContainer(containerId6); - while (schedulerAttempt.getReservedContainers().size() == 0) { + while (schedulerAttempt.getReservedContainers().isEmpty()) { System.out.println("Waiting for container " + containerId6 + " to be reserved."); nm1.nodeHeartbeat(true); @@ -219,7 +225,7 @@ public class TestAMRestart { // record the scheduler attempt for testing. SchedulerApplicationAttempt schedulerNewAttempt = - ((CapacityScheduler) rm1.getResourceScheduler()) + ((AbstractYarnScheduler) rm1.getResourceScheduler()) .getCurrentAttemptForContainer(containerId2); // finish this application MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2); @@ -245,7 +251,7 @@ public class TestAMRestart { } } - @Test + @Test(timeout = 30000) public void testNMTokensRebindOnAMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); @@ -264,31 +270,36 @@ public class TestAMRestart { nm2.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - int NUM_CONTAINERS = 1; List<Container> containers = new ArrayList<Container>(); // nmTokens keeps track of all the nmTokens issued in the allocate call. List<NMToken> expectedNMTokens = new ArrayList<NMToken>(); - // am1 allocate 1 container on nm1. + // am1 allocate 2 container on nm1. + // first container while (true) { AllocateResponse response = - am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + am1.allocate("127.0.0.1", 2000, 2, new ArrayList<ContainerId>()); nm1.nodeHeartbeat(true); containers.addAll(response.getAllocatedContainers()); expectedNMTokens.addAll(response.getNMTokens()); - if (containers.size() == NUM_CONTAINERS) { + if (containers.size() == 2) { break; } Thread.sleep(200); System.out.println("Waiting for container to be allocated."); } - // launch the container + // launch the container-2 nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); ContainerId containerId2 = ContainerId.newInstance(am1.getApplicationAttemptId(), 2); rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); - + // launch the container-3 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + // fail am1 nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am1.waitForState(RMAppAttemptState.FAILED); @@ -308,12 +319,12 @@ public class TestAMRestart { containers = new ArrayList<Container>(); while (true) { AllocateResponse allocateResponse = - am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, + am2.allocate("127.1.1.1", 4000, 1, new ArrayList<ContainerId>()); nm2.nodeHeartbeat(true); containers.addAll(allocateResponse.getAllocatedContainers()); expectedNMTokens.addAll(allocateResponse.getNMTokens()); - if (containers.size() == NUM_CONTAINERS) { + if (containers.size() == 1) { break; } Thread.sleep(200); @@ -340,4 +351,237 @@ public class TestAMRestart { Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); rm1.stop(); } + + // AM container preempted, nm disk failure + // should not be counted towards AM max retry count. + @Test(timeout = 100000) + public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + // Preempt the first attempt; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + // AM should be restarted even though max-am-attempt is 1. + MockAM am2 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + + // Preempt the second attempt. + ContainerId amContainer2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 1); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); + + am2.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am3 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt()); + + // mimic NM disk_failure + ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); + containerStatus.setContainerId(attempt3.getMasterContainer().getId()); + containerStatus.setDiagnostics("mimic NM disk_failure"); + containerStatus.setState(ContainerState.COMPLETE); + containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED); + Map<ApplicationId, List<ContainerStatus>> conts = + new HashMap<ApplicationId, List<ContainerStatus>>(); + conts.put(app1.getApplicationId(), + Collections.singletonList(containerStatus)); + nm1.nodeHeartbeat(conts, true); + + am3.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.DISKS_FAILED, + appState.getAttempt(am3.getApplicationAttemptId()) + .getAMContainerExitStatus()); + + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am4 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt()); + + // create second NM, and register to rm1 + MockNM nm2 = + new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + // nm1 heartbeats to report unhealthy + // This will mimic ContainerExitStatus.ABORT + nm1.nodeHeartbeat(false); + am4.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + appState.getAttempt(am4.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // launch next AM in nm2 + nm2.nodeHeartbeat(true); + MockAM am5 = + rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2); + RMAppAttempt attempt5 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt()); + // fail the AM normally + nm2 + .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am5.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry()); + + // AM should not be restarted. + rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); + Assert.assertEquals(5, app1.getAppAttempts().size()); + rm1.stop(); + } + + // Test RM restarts after AM container is preempted, new RM should not count + // AM preemption failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 20000) + public void testPreemptedAMRestartOnRMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 1); + + // Forcibly preempt the am container; + scheduler.killContainer(scheduler.getRMContainer(amContainer)); + + am1.waitForState(RMAppAttemptState.FAILED); + Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // state store has 1 attempt stored. + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + Assert.assertEquals(1, appState.getAttemptCount()); + // attempt stored has the preempted container exit status. + Assert.assertEquals(ContainerExitStatus.PREEMPTED, + appState.getAttempt(am1.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(); + rm2.start(); + + // Restarted RM should re-launch the am. + MockAM am2 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + RMAppAttempt attempt2 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.INVALID, + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); + rm1.stop(); + rm2.stop(); + } + + // Test regular RM restart/failover, new RM should not count + // AM failure towards the max-retry-account and should be able to + // re-launch the AM. + @Test(timeout = 50000) + public void testRMRestartOrFailoverNotCountedForAMFailures() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 1. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + // AM should be restarted even though max-am-attempt is 1. + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt()); + + // Restart rm. + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + ApplicationState appState = + memStore.getState().getApplicationState().get(app1.getApplicationId()); + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt1.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + appState.getAttempt(am1.getApplicationAttemptId()) + .getAMContainerExitStatus()); + // Will automatically start a new AppAttempt in rm2 + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am2 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2); + RMAppAttempt attempt3 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()) + .getCurrentAppAttempt(); + Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry()); + Assert.assertEquals(ContainerExitStatus.INVALID, + appState.getAttempt(am2.getApplicationAttemptId()) + .getAMContainerExitStatus()); + + rm1.stop(); + rm2.stop(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,25 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; @@ -27,12 +46,16 @@ import java.util.Random; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; @@ -52,22 +75,13 @@ import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; int appAlloc = 0; + boolean setAMContainer = false; + float setAMResourcePercent = 0.0f; Random rand = null; Clock mClock = null; Configuration conf = null; @@ -115,6 +129,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 0, 0, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -133,6 +148,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C D { 100, 10, 40, 20, 30 }, // abs + { 100, 100, 100, 100, 100 }, // maxCap { 100, 30, 60, 10, 0 }, // used { 45, 20, 5, 20, 0 }, // pending { 0, 0, 0, 0, 0 }, // reserved @@ -144,12 +160,33 @@ public class TestProportionalCapacityPre policy.editSchedule(); verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); } + + @Test + public void testMaxCap() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 100, 45, 100 }, // maxCap + { 100, 55, 45, 0 }, // used + { 20, 10, 10, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // despite the imbalance, since B is at maxCap, do not correct + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + } + @Test public void testPreemptCycle() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -169,6 +206,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -205,6 +243,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 39, 43, 21 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -224,6 +263,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 55, 45, 0 }, // used { 20, 10, 10, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -242,6 +282,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 55, 45, 0 }, // used { 20, 10, 10, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -261,6 +302,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 90, 10, 0 }, // used { 80, 10, 20, 50 }, // pending { 0, 0, 0, 0 }, // reserved @@ -280,6 +322,7 @@ public class TestProportionalCapacityPre int[][] qData = new int[][] { // / A B C D E F { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap { 200, 110, 60, 50, 90, 90, 0 }, // used { 10, 0, 0, 0, 10, 0, 10 }, // pending { 0, 0, 0, 0, 0, 0, 0 }, // reserved @@ -295,10 +338,54 @@ public class TestProportionalCapacityPre } @Test + public void testZeroGuar() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 0, 99, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 170, 80, 60, 20, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from A1, not B1 despite B1 being far over + // its absolute guaranteed capacity + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testZeroGuarOverCap() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 0, 99, 0, 100, 100 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 170, 170, 60, 20, 90, 0, 0 }, // used + { 85, 50, 30, 10, 10, 20, 20 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 4, 3, 1, 1, 1, 1, 1 }, // apps + { -1, -1, 1, 1, 1, -1, 1 }, // req granularity + { 2, 3, 0, 0, 0, 1, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // we verify both that C has priority on B and D (has it has >0 guarantees) + // and that B and D are force to share their over capacity fairly (as they + // are both zero-guarantees) hence D sees some of its containers preempted + verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + + + @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { // / A B C D E F G H I - { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved @@ -351,7 +438,138 @@ public class TestProportionalCapacityPre assert containers.get(4).equals(rm5); } - + + @Test + public void testPolicyInitializeAfterSchedulerInitialized() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + + @SuppressWarnings("resource") + MockRM rm = new MockRM(conf); + rm.init(conf); + + // ProportionalCapacityPreemptionPolicy should be initialized after + // CapacityScheduler initialized. We will + // 1) find SchedulingMonitor from RMActiveService's service list, + // 2) check if ResourceCalculator in policy is null or not. + // If it's not null, we can come to a conclusion that policy initialized + // after scheduler got initialized + for (Service service : rm.getRMActiveService().getServices()) { + if (service instanceof SchedulingMonitor) { + ProportionalCapacityPreemptionPolicy policy = + (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service) + .getSchedulingEditPolicy(); + assertNotNull(policy.getResourceCalculator()); + return; + } + } + + fail("Failed to find SchedulingMonitor service, please check what happened"); + } + + @Test + public void testSkipAMContainer() { + int[][] qData = new int[][] { + // / A B + { 100, 50, 50 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 50 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 1, 1 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // By skipping AM Container, all other 24 containers of appD will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // By skipping AM Container, all other 24 containers of appC will be + // preempted + verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // Since AM containers of appC and appD are saved, 2 containers from appB + // has to be preempted. + verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB))); + setAMContainer = false; + } + + @Test + public void testPreemptSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // All 5 containers of appD will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // All 5 containers of appC will be preempted including AM container. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appA will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + } + + @Test + public void testAMResourcePercentForSkippedAMContainers() { + int[][] qData = new int[][] { + // / A B + { 100, 10, 90 }, // abs + { 100, 100, 100 }, // maxcap + { 100, 100, 0 }, // used + { 70, 20, 90 }, // pending + { 0, 0, 0 }, // reserved + { 5, 4, 1 }, // apps + { -1, 5, 5 }, // req granularity + { 2, 0, 0 }, // subqueues + }; + setAMContainer = true; + setAMResourcePercent = 0.5f; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb. + // Total used AM container size is 20GB, hence 2 AM container has + // to be preempted as Queue Capacity is 10Gb. + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); + + // Including AM Container, all other 4 containers of appC will be + // preempted + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // By skipping AM Container, all other 4 containers of appB will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); + + // By skipping AM Container, all other 4 containers of appA will be + // preempted + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); + setAMContainer = false; + } + static class IsPreemptionRequestFor extends ArgumentMatcher<ContainerPreemptEvent> { private final ApplicationAttemptId appAttId; @@ -382,24 +600,25 @@ public class TestProportionalCapacityPre when(mCS.getRootQueue()).thenReturn(mRoot); Resource clusterResources = - Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0); - when(mCS.getClusterResources()).thenReturn(clusterResources); + Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); + when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; } ParentQueue buildMockRootQueue(Random r, int[]... queueData) { int[] abs = queueData[0]; - int[] used = queueData[1]; - int[] pending = queueData[2]; - int[] reserved = queueData[3]; - int[] apps = queueData[4]; - int[] gran = queueData[5]; - int[] queues = queueData[6]; + int[] maxCap = queueData[1]; + int[] used = queueData[2]; + int[] pending = queueData[3]; + int[] reserved = queueData[4]; + int[] apps = queueData[5]; + int[] gran = queueData[6]; + int[] queues = queueData[7]; - return mockNested(abs, used, pending, reserved, apps, gran, queues); + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); } - ParentQueue mockNested(int[] abs, int[] used, + ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { float tot = leafAbsCapacities(abs, queues); Deque<ParentQueue> pqs = new LinkedList<ParentQueue>(); @@ -407,6 +626,8 @@ public class TestProportionalCapacityPre when(root.getQueueName()).thenReturn("/"); when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); + when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + for (int i = 1; i < queues.length; ++i) { final CSQueue q; final ParentQueue p = pqs.removeLast(); @@ -420,6 +641,7 @@ public class TestProportionalCapacityPre when(q.getQueueName()).thenReturn(queueName); when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); + when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); } assert 0 == pqs.size(); return root; @@ -439,7 +661,7 @@ public class TestProportionalCapacityPre return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, + LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn( @@ -464,6 +686,9 @@ public class TestProportionalCapacityPre } } when(lq.getApplications()).thenReturn(qApps); + if(setAMResourcePercent != 0.0f){ + when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); + } p.getChildQueues().add(lq); return lq; } @@ -488,7 +713,11 @@ public class TestProportionalCapacityPre List<RMContainer> cLive = new ArrayList<RMContainer>(); for (int i = 0; i < used; i += gran) { - cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + if(setAMContainer && i == 0){ + cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); + }else{ + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + } ++cAlloc; } when(app.getLiveContainers()).thenReturn(cLive); @@ -504,6 +733,10 @@ public class TestProportionalCapacityPre RMContainer mC = mock(RMContainer.class); when(mC.getContainerId()).thenReturn(cId); when(mC.getContainer()).thenReturn(c); + when(mC.getApplicationAttemptId()).thenReturn(appAttId); + if(0 == priority){ + when(mC.isAMContainer()).thenReturn(true); + } return mC; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +35,6 @@ import java.util.Map; import javax.crypto.SecretKey; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -55,18 +55,21 @@ import org.apache.hadoop.yarn.event.Disp import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; public class RMStateStoreTestBase extends ClientBaseWithFixes{ @@ -74,10 +77,9 @@ public class RMStateStoreTestBase extend public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); static class TestDispatcher implements - Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> { + Dispatcher, EventHandler<RMAppAttemptEvent> { ApplicationAttemptId attemptId; - Exception storedException; boolean notified = false; @@ -88,9 +90,8 @@ public class RMStateStoreTestBase extend } @Override - public void handle(RMAppAttemptNewSavedEvent event) { + public void handle(RMAppAttemptEvent event) { assertEquals(attemptId, event.getApplicationAttemptId()); - assertEquals(storedException, event.getStoredException()); notified = true; synchronized (this) { notifyAll(); @@ -108,8 +109,8 @@ public class RMStateStoreTestBase extend interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; boolean isFinalStateValid() throws Exception; - void writeVersion(RMStateVersion version) throws Exception; - RMStateVersion getCurrentVersion() throws Exception; + void writeVersion(Version version) throws Exception; + Version getCurrentVersion() throws Exception; boolean appExists(RMApp app) throws Exception; } @@ -160,7 +161,6 @@ public class RMStateStoreTestBase extend when(mockAttempt.getClientTokenMasterKey()) .thenReturn(clientTokenMasterKey); dispatcher.attemptId = attemptId; - dispatcher.storedException = null; store.storeNewApplicationAttempt(mockAttempt); waitNotify(dispatcher); return container.getId(); @@ -175,8 +175,15 @@ public class RMStateStoreTestBase extend TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + AMRMTokenSecretManager appTokenMgr = - new AMRMTokenSecretManager(conf); + spy(new AMRMTokenSecretManager(conf, rmContext)); + + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); @@ -188,8 +195,6 @@ public class RMStateStoreTestBase extend // create application token and client token key for attempt1 Token<AMRMTokenIdentifier> appAttemptToken1 = generateAMRMToken(attemptId1, appTokenMgr); - HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>(); - attemptTokenSet1.add(appAttemptToken1); SecretKey clientTokenKey1 = clientToAMTokenMgr.createMasterKey(attemptId1); @@ -204,8 +209,6 @@ public class RMStateStoreTestBase extend // create application token and client token key for attempt2 Token<AMRMTokenIdentifier> appAttemptToken2 = generateAMRMToken(attemptId2, appTokenMgr); - HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>(); - attemptTokenSet2.add(appAttemptToken2); SecretKey clientTokenKey2 = clientToAMTokenMgr.createMasterKey(attemptId2); @@ -267,12 +270,9 @@ public class RMStateStoreTestBase extend // attempt1 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId1, attemptState.getAttemptId()); + assertEquals(-1000, attemptState.getAMContainerExitStatus()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); - // attempt1 applicationToken is loaded correctly - HashSet<Token<?>> savedTokens = new HashSet<Token<?>>(); - savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens()); - assertEquals(attemptTokenSet1, savedTokens); // attempt1 client token master key is loaded correctly assertArrayEquals(clientTokenKey1.getEncoded(), attemptState.getAppAttemptCredentials() @@ -284,10 +284,6 @@ public class RMStateStoreTestBase extend assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); - // attempt2 applicationToken is loaded correctly - savedTokens.clear(); - savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens()); - assertEquals(attemptTokenSet2, savedTokens); // attempt2 client token master key is loaded correctly assertArrayEquals(clientTokenKey2.getEncoded(), attemptState.getAppAttemptCredentials() @@ -308,7 +304,7 @@ public class RMStateStoreTestBase extend oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED); + FinalApplicationStatus.SUCCEEDED, 100); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -331,7 +327,7 @@ public class RMStateStoreTestBase extend oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED); + FinalApplicationStatus.SUCCEEDED, 111); store.updateApplicationAttemptState(dummyAttempt); // let things settle down @@ -370,6 +366,7 @@ public class RMStateStoreTestBase extend assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics()); + assertEquals(100, updatedAttemptState.getAMContainerExitStatus()); assertEquals(FinalApplicationStatus.SUCCEEDED, updatedAttemptState.getFinalApplicationStatus()); @@ -453,10 +450,8 @@ public class RMStateStoreTestBase extend private Token<AMRMTokenIdentifier> generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { - AMRMTokenIdentifier appTokenId = - new AMRMTokenIdentifier(attemptId); Token<AMRMTokenIdentifier> appToken = - new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr); + appTokenMgr.createAndGetAMRMToken(attemptId); appToken.setService(new Text("appToken service")); return appToken; } @@ -467,13 +462,13 @@ public class RMStateStoreTestBase extend store.setRMDispatcher(new TestDispatcher()); // default version - RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion(); + Version defaultVersion = stateStoreHelper.getCurrentVersion(); store.checkVersion(); Assert.assertEquals(defaultVersion, store.loadVersion()); // compatible version - RMStateVersion compatibleVersion = - RMStateVersion.newInstance(defaultVersion.getMajorVersion(), + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), defaultVersion.getMinorVersion() + 2); stateStoreHelper.writeVersion(compatibleVersion); Assert.assertEquals(compatibleVersion, store.loadVersion()); @@ -482,8 +477,8 @@ public class RMStateStoreTestBase extend Assert.assertEquals(defaultVersion, store.loadVersion()); // incompatible version - RMStateVersion incompatibleVersion = - RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2, + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 2, defaultVersion.getMinorVersion()); stateStoreHelper.writeVersion(incompatibleVersion); try { @@ -493,21 +488,53 @@ public class RMStateStoreTestBase extend Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } + + public void testEpoch(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + int firstTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(0, firstTimeEpoch); + + int secondTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(1, secondTimeEpoch); + + int thirdTimeEpoch = store.getAndIncrementEpoch(); + Assert.assertEquals(2, thirdTimeEpoch); + } public void testAppDeletion(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(new TestDispatcher()); - // create and store apps + ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5); + + for (RMApp app : appList) { + // remove the app + store.removeApplication(app); + // wait for app to be removed. + while (true) { + if (!stateStoreHelper.appExists(app)) { + break; + } else { + Thread.sleep(100); + } + } + } + } + + private ArrayList<RMApp> createAndStoreApps( + RMStateStoreHelper stateStoreHelper, RMStateStore store, int numApps) + throws Exception { ArrayList<RMApp> appList = new ArrayList<RMApp>(); - int NUM_APPS = 5; - for (int i = 0; i < NUM_APPS; i++) { + for (int i = 0; i < numApps; i++) { ApplicationId appId = ApplicationId.newInstance(1383183338, i); RMApp app = storeApp(store, appId, 123456789, 987654321); appList.add(app); } - Assert.assertEquals(NUM_APPS, appList.size()); + Assert.assertEquals(numApps, appList.size()); for (RMApp app : appList) { // wait for app to be stored. while (true) { @@ -518,18 +545,17 @@ public class RMStateStoreTestBase extend } } } + return appList; + } + public void testDeleteStore(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + ArrayList<RMApp> appList = createAndStoreApps(stateStoreHelper, store, 5); + store.deleteStore(); + // verify apps deleted for (RMApp app : appList) { - // remove the app - store.removeApplication(app); - // wait for app to be removed. - while (true) { - if (!stateStoreHelper.appExists(app)) { - break; - } else { - Thread.sleep(100); - } - } + Assert.assertFalse(stateStoreHelper.appExists(app)); } } @@ -541,4 +567,65 @@ public class RMStateStoreTestBase extend } + public void testAMRMTokenSecretManagerStateStore( + RMStateStoreHelper stateStoreHelper) throws Exception { + System.out.println("Start testing"); + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + Configuration conf = new YarnConfiguration(); + AMRMTokenSecretManager appTokenMgr = + new AMRMTokenSecretManager(conf, rmContext); + + //create and save the first masterkey + MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey(); + + AMRMTokenSecretManagerState state1 = + AMRMTokenSecretManagerState.newInstance( + firstMasterKeyData.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state1, + false); + + // load state + store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + Assert.assertNotNull(state.getAMRMTokenSecretManagerState()); + Assert.assertEquals(firstMasterKeyData.getMasterKey(), state + .getAMRMTokenSecretManagerState().getCurrentMasterKey()); + Assert.assertNull(state + .getAMRMTokenSecretManagerState().getNextMasterKey()); + + //create and save the second masterkey + MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey(); + AMRMTokenSecretManagerState state2 = + AMRMTokenSecretManagerState + .newInstance(firstMasterKeyData.getMasterKey(), + secondMasterKeyData.getMasterKey()); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state2, + true); + + // load state + store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(dispatcher); + RMState state_2 = store.loadState(); + Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState()); + Assert.assertEquals(firstMasterKeyData.getMasterKey(), state_2 + .getAMRMTokenSecretManagerState().getCurrentMasterKey()); + Assert.assertEquals(secondMasterKeyData.getMasterKey(), state_2 + .getAMRMTokenSecretManagerState().getNextMasterKey()); + + // re-create the masterKeyData based on the recovered masterkey + // should have the same secretKey + appTokenMgr.recover(state_2); + Assert.assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(), + firstMasterKeyData.getSecretKey()); + Assert.assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(), + secondMasterKeyData.getSecretKey()); + + store.close(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -37,9 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -71,7 +70,7 @@ public class TestFSRMStateStore extends return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE); } - public RMStateVersion getCurrentVersion() { + public Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @@ -112,13 +111,13 @@ public class TestFSRMStateStore extends } @Override - public void writeVersion(RMStateVersion version) throws Exception { - store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version) + public void writeVersion(Version version) throws Exception { + store.updateFile(store.getVersionNode(), ((VersionPBImpl) version) .getProto().toByteArray()); } @Override - public RMStateVersion getCurrentVersion() throws Exception { + public Version getCurrentVersion() throws Exception { return store.getCurrentVersion(); } @@ -158,7 +157,10 @@ public class TestFSRMStateStore extends .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); testCheckVersion(fsTester); + testEpoch(fsTester); testAppDeletion(fsTester); + testDeleteStore(fsTester); + testAMRMTokenSecretManagerStateStore(fsTester); } finally { cluster.shutdown(); } @@ -213,9 +215,8 @@ public class TestFSRMStateStore extends try { store.storeApplicationStateInternal( ApplicationId.newInstance(100L, 1), - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + ApplicationStateData.newInstance(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); } catch (Exception e) { // TODO 0 datanode exception will not be retried by dfs client, fix // that separately. Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -32,9 +32,9 @@ import org.apache.hadoop.ha.HAServicePro import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; @@ -69,7 +69,7 @@ public class TestZKRMStateStore extends return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; } - public RMStateVersion getCurrentVersion() { + public Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @@ -96,13 +96,13 @@ public class TestZKRMStateStore extends } @Override - public void writeVersion(RMStateVersion version) throws Exception { - client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version) + public void writeVersion(Version version) throws Exception { + client.setData(store.getVersionNode(), ((VersionPBImpl) version) .getProto().toByteArray(), -1); } @Override - public RMStateVersion getCurrentVersion() throws Exception { + public Version getCurrentVersion() throws Exception { return store.getCurrentVersion(); } @@ -120,7 +120,10 @@ public class TestZKRMStateStore extends testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); testCheckVersion(zkTester); + testEpoch(zkTester); testAppDeletion(zkTester); + testDeleteStore(zkTester); + testAMRMTokenSecretManagerStateStore(zkTester); } private Configuration createHARMConf( Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Tue Aug 19 23:49:39 2014 @@ -41,6 +41,7 @@ import java.security.NoSuchAlgorithmExce import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -203,7 +204,7 @@ public class TestZKRMStateStoreZKClientC LOG.error(error, e); fail(error); } - Assert.assertEquals("newBytes", new String(ret)); + assertEquals("newBytes", new String(ret)); } @Test(timeout = 20000) @@ -232,7 +233,7 @@ public class TestZKRMStateStoreZKClientC try { byte[] ret = store.getDataWithRetries(path, false); - Assert.assertEquals("bytes", new String(ret)); + assertEquals("bytes", new String(ret)); } catch (Exception e) { String error = "New session creation failed"; LOG.error(error, e); @@ -281,4 +282,24 @@ public class TestZKRMStateStoreZKClientC zkClientTester.getRMStateStore(conf); } + + @Test + public void testZKRetryInterval() throws Exception { + TestZKClient zkClientTester = new TestZKClient(); + YarnConfiguration conf = new YarnConfiguration(); + + ZKRMStateStore store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS, + store.zkRetryInterval); + store.stop(); + + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS / + YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES, + store.zkRetryInterval); + store.stop(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Tue Aug 19 23:49:39 2014 @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -232,4 +234,18 @@ public class MockRMApp implements RMApp public YarnApplicationState createApplicationState() { return null; } + + @Override + public Set<NodeId> getRanNodes() { + return null; + } + + public Resource getResourcePreempted() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public RMAppMetrics getRMAppMetrics() { + throw new UnsupportedOperationException("Not supported yet."); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Aug 19 23:49:39 2014 @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; @@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; @@ -192,7 +192,7 @@ public class TestRMAppTransitions { this.rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, new AMRMTokenSecretManager(conf), + null, new AMRMTokenSecretManager(conf, this.rmContext), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), @@ -327,15 +327,15 @@ public class TestRMAppTransitions { private void sendAppUpdateSavedEvent(RMApp application) { RMAppEvent event = - new RMAppUpdateSavedEvent(application.getApplicationId(), null); + new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); application.handle(event); rmDispatcher.await(); } private void sendAttemptUpdateSavedEvent(RMApp application) { application.getCurrentAppAttempt().handle( - new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt() - .getAppAttemptId(), null)); + new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } protected RMApp testCreateAppNewSaving( @@ -356,7 +356,7 @@ public class TestRMAppTransitions { RMApp application = testCreateAppNewSaving(submissionContext); // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED RMAppEvent event = - new RMAppNewSavedEvent(application.getApplicationId(), null); + new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); @@ -421,7 +421,7 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinalSaving(submissionContext); // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED RMAppEvent appUpdated = - new RMAppUpdateSavedEvent(application.getApplicationId(), null); + new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); application.handle(appUpdated); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); @@ -762,7 +762,7 @@ public class TestRMAppTransitions { application.handle(event); assertAppState(RMAppState.FINAL_SAVING, application); RMAppEvent appUpdated = - new RMAppUpdateSavedEvent(application.getApplicationId(), null); + new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); application.handle(appUpdated); assertAppState(RMAppState.FINISHED, application); @@ -921,6 +921,7 @@ public class TestRMAppTransitions { assertAppState(RMAppState.NEW, app); ApplicationReport report = app.createAndGetApplicationReport(null, true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); + Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); report = app.createAndGetApplicationReport("clientuser", true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); }
