Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Jun 18 23:15:04 2014 @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -473,7 +475,13 @@ public class RMNodeImpl implements RMNod } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); - containers = startEvent.getContainerRecoveryReports(); + containers = startEvent.getNMContainerStatuses(); + } + + if (null != startEvent.getRunningApplications()) { + for (ApplicationId appId : startEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); + } } rmNode.context.getDispatcher().getEventHandler() @@ -482,6 +490,24 @@ public class RMNodeImpl implements RMNod new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); } + + void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, + ApplicationId appId, NodeId nodeId) { + RMApp app = context.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // add the app to the finishedApplications list so that the app can be + // cleaned up on the NM + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId + + ", just added it to finishedApplications list for cleanup"); + rmNode.finishedApplications.add(appId); + return; + } + + context.getDispatcher().getEventHandler() + .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); + } } public static class ReconnectNodeTransition implements @@ -517,7 +543,7 @@ public class RMNodeImpl implements RMNod } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null)); + new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent(
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java Wed Jun 18 23:15:04 2014 @@ -20,19 +20,28 @@ package org.apache.hadoop.yarn.server.re import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeStartedEvent extends RMNodeEvent { - private List<NMContainerStatus> containerReports; + private List<NMContainerStatus> containerStatuses; + private List<ApplicationId> runningApplications; - public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) { + public RMNodeStartedEvent(NodeId nodeId, + List<NMContainerStatus> containerReports, + List<ApplicationId> runningApplications) { super(nodeId, RMNodeEventType.STARTED); - this.containerReports = containerReports; + this.containerStatuses = containerReports; + this.runningApplications = runningApplications; } - public List<NMContainerStatus> getContainerRecoveryReports() { - return this.containerReports; + public List<NMContainerStatus> getNMContainerStatuses() { + return this.containerStatuses; + } + + public List<ApplicationId> getRunningApplications() { + return runningApplications; } } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Wed Jun 18 23:15:04 2014 @@ -130,9 +130,9 @@ public abstract class SchedulerNode { LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " + + ", which has " + numContainers + " containers, " + getUsedResource() + " used and " + getAvailableResource() - + " available"); + + " available after allocation"); } /** Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Jun 18 23:15:04 2014 @@ -100,11 +100,17 @@ public class MockNM { } public RegisterNodeManagerResponse registerNode() throws Exception { - return registerNode(null); + return registerNode(null, null); + } + + public RegisterNodeManagerResponse registerNode( + List<ApplicationId> runningApplications) throws Exception { + return registerNode(null, runningApplications); } public RegisterNodeManagerResponse registerNode( - List<NMContainerStatus> containerReports) throws Exception{ + List<NMContainerStatus> containerReports, + List<ApplicationId> runningApplications) throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -113,6 +119,7 @@ public class MockNM { req.setResource(resource); req.setContainerStatuses(containerReports); req.setNMVersion(version); + req.setRunningApplications(runningApplications); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Jun 18 23:15:04 2014 @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -350,11 +351,20 @@ public class MockRM extends ResourceMana nm.registerNode(); return nm; } + + public MockNM registerNode(String nodeIdStr, int memory, int vCores, + List<ApplicationId> runningApplications) throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), + YarnVersionInfo.getVersion()); + nm.registerNode(runningApplications); + return nm; + } public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null)); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); } public void sendNodeLost(MockNM nm) throws Exception { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Wed Jun 18 23:15:04 2014 @@ -18,26 +18,30 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -45,13 +49,29 @@ import org.apache.hadoop.yarn.server.uti import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestApplicationCleanup { private static final Log LOG = LogFactory .getLog(TestApplicationCleanup.class); + + private YarnConfiguration conf; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + } + @SuppressWarnings("resource") @Test public void testAppCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -130,6 +150,7 @@ public class TestApplicationCleanup { rm.stop(); } + @SuppressWarnings("resource") @Test public void testContainerCleanup() throws Exception { @@ -252,6 +273,69 @@ public class TestApplicationCleanup { rm.stop(); } + + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) + throws Exception { + while (true) { + NodeHeartbeatResponse response = nm.nodeHeartbeat(true); + if (response.getApplicationsToCleanup() != null + && response.getApplicationsToCleanup().size() == 1 + && appId.equals(response.getApplicationsToCleanup().get(0))) { + return; + } + + LOG.info("Haven't got application=" + appId.toString() + + " in cleanup list from node heartbeat response, " + + "sleep for a while before next heartbeat"); + Thread.sleep(1000); + } + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); + rm2.stop(); + } public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Wed Jun 18 23:15:04 2014 @@ -161,7 +161,7 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -189,11 +189,11 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -249,7 +249,7 @@ public class TestRMNodeTransitions { @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -465,7 +465,7 @@ public class TestRMNodeTransitions { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -496,7 +496,7 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Jun 18 23:15:04 2014 @@ -102,7 +102,6 @@ import org.junit.Before; import org.junit.Test; public class TestRMRestart { - private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -309,7 +308,7 @@ public class TestRMRestart { TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .getAppAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -392,7 +391,7 @@ public class TestRMRestart { // completed apps are not removed immediately after app finish // And finished app is also loaded back. Assert.assertEquals(4, rmAppState.size()); - } + } @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { @@ -514,7 +513,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart.createNMContainerStatus( am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -1680,7 +1679,8 @@ public class TestRMRestart { TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .getAppAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); + while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1807,7 +1807,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart.createNMContainerStatus( am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); } }; } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Wed Jun 18 23:15:04 2014 @@ -159,7 +159,7 @@ public class TestWorkPreservingRMRestart ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(amContainer, runningContainer, - completedContainer)); + completedContainer), null); // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); @@ -383,11 +383,11 @@ public class TestWorkPreservingRMRestart List<NMContainerStatus> am1_2Containers = createNMContainerStatusForApp(am1_2); am1_1Containers.addAll(am1_2Containers); - nm1.registerNode(am1_1Containers); + nm1.registerNode(am1_1Containers, null); List<NMContainerStatus> am2Containers = createNMContainerStatusForApp(am2); - nm2.registerNode(am2Containers); + nm2.registerNode(am2Containers, null); // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); @@ -482,7 +482,7 @@ public class TestWorkPreservingRMRestart TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(amContainer, runningContainer, - completedContainer)); + completedContainer), null); rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); // Wait for RM to settle down on recovering containers; Thread.sleep(3000); @@ -519,7 +519,7 @@ public class TestWorkPreservingRMRestart NMContainerStatus completedContainer = TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer), null); RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Wed Jun 18 23:15:04 2014 @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -159,6 +160,11 @@ public abstract class MockAsm extends Mo public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Set<NodeId> getRanNodes() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Wed Jun 18 23:15:04 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -232,4 +233,9 @@ public class MockRMApp implements RMApp public YarnApplicationState createApplicationState() { return null; } + + @Override + public Set<NodeId> getRanNodes() { + return null; + } } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1603664&r1=1603663&r2=1603664&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Wed Jun 18 23:15:04 2014 @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -315,7 +316,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); assertNotNull(applicationAttempt.getTrackingUrl()); assertFalse("N/A".equals(applicationAttempt.getTrackingUrl())); @@ -331,7 +332,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); if (UserGroupInformation.isSecurityEnabled()) { verify(clientToAMTokenManager).createMasterKey( @@ -359,7 +360,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); // Check events @@ -385,7 +386,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); @@ -425,7 +426,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); } @@ -461,7 +462,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); @@ -666,8 +667,10 @@ public class TestRMAppAttemptTransitions runApplicationAttempt(null, "host", 8042, url, true); // complete a container - applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( - applicationAttempt.getAppAttemptId(), mock(Container.class))); + Container container = mock(Container.class); + when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), + container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); // complete AM @@ -845,7 +848,7 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); @@ -882,7 +885,7 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
