Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Thu Aug 21 05:22:10 2014 @@ -171,7 +171,6 @@ public class TestApplicationMasterServic RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.setAMRMProtocol(rm.getApplicationMasterService()); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List<ContainerId> release = new ArrayList<ContainerId>();
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Thu Aug 21 05:22:10 2014 @@ -331,6 +331,10 @@ public class TestRMHA { rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToStandby(requestInfo); + + MyCountingDispatcher dispatcher = + (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + assertTrue(!dispatcher.isStopped()); rm.adminService.transitionToActive(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, @@ -339,6 +343,11 @@ public class TestRMHA { assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + + // Keep the dispatcher reference before transitioning to standby + dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + + rm.adminService.transitionToStandby(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) @@ -346,6 +355,8 @@ public class TestRMHA { assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + assertTrue(dispatcher.isStopped()); + rm.stop(); } @@ -492,6 +503,8 @@ public class TestRMHA { private int eventHandlerCount; + private volatile boolean stopped = false; + public MyCountingDispatcher() { super("MyCountingDispatcher"); this.eventHandlerCount = 0; @@ -510,5 +523,15 @@ public class TestRMHA { public int getEventHandlerCount() { return this.eventHandlerCount; } + + @Override + protected void serviceStop() throws Exception { + this.stopped = true; + super.serviceStop(); + } + + public boolean isStopped() { + return this.stopped; + } } } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Aug 21 05:22:10 2014 @@ -289,7 +289,7 @@ public class TestRMRestart { // verify old AM is not accepted // change running AM to talk to new RM - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); AllocateResponse allocResponse = am1.allocate( new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); @@ -1663,7 +1663,7 @@ public class TestRMRestart { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Thu Aug 21 05:22:10 2014 @@ -33,10 +33,13 @@ import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -72,6 +75,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import com.google.common.base.Supplier; + + @SuppressWarnings({"rawtypes", "unchecked"}) @RunWith(value = Parameterized.class) public class TestWorkPreservingRMRestart { @@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); } + // Test if RM on recovery receives the container release request from AM + // before it receives the container status reported by NM for recovery. this + // container should not be recovered. + @Test (timeout = 30000) + public void testReleasedContainerNotRecovered() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + rm1.start(); + + RMApp app1 = rm1.submitApp(1024); + final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000); + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + + // try to release a container before the container is actually recovered. + final ContainerId runningContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + am1.allocate(null, Arrays.asList(runningContainer)); + + // send container statuses to recover the containers + List<NMContainerStatus> containerStatuses = + createNMContainerStatusForApp(am1); + nm1.registerNode(containerStatuses, null); + + // only the am container should be recovered. + waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId()); + + final AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // cached release request is cleaned. + // assertFalse(scheduler.getPendingRelease().contains(runningContainer)); + + AllocateResponse response = am1.allocate(null, null); + // AM gets notified of the completed container. + boolean receivedCompletedContainer = false; + for (ContainerStatus status : response.getCompletedContainersStatuses()) { + if (status.getContainerId().equals(runningContainer)) { + receivedCompletedContainer = true; + } + } + assertTrue(receivedCompletedContainer); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + public Boolean get() { + // release cache is cleaned up and previous running container is not + // recovered + return scheduler.getApplicationAttempt(am1.getApplicationAttemptId()) + .getPendingRelease().isEmpty() + && scheduler.getRMContainer(runningContainer) == null; + } + }, 1000, 20000); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, @@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); } - private void waitForNumContainersToRecover(int num, MockRM rm, + public static void waitForNumContainersToRecover(int num, MockRM rm, ApplicationAttemptId attemptId) throws Exception { AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); @@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart attempt = scheduler.getApplicationAttempt(attemptId); } while (attempt.getLiveContainers().size() < num) { - System.out.println("Wait for " + num + " containers to recover."); + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); Thread.sleep(200); } }
