Author: tomwhite
Date: Mon Dec 3 12:12:11 2012
New Revision: 1416484
URL: http://svn.apache.org/viewvc?rev=1416484&view=rev
Log:
YARN-72. NM should handle cleaning up containers when it shuts down.
Contributed by Sandy Ryza.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
(with props)
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
(with props)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1416484&r1=1416483&r2=1416484&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Dec 3 12:12:11 2012
@@ -117,7 +117,11 @@ Release 2.0.3-alpha - Unreleased
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
- YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via
tomwhite)
+ YARN-187. Add hierarchical queues to the fair scheduler.
+ (Sandy Ryza via tomwhite)
+
+ YARN-72. NM should handle cleaning up containers when it shuts down.
+ (Sandy Ryza via tomwhite)
Release 2.0.2-alpha - 2012-09-07
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java?rev=1416484&r1=1416483&r2=1416484&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
Mon Dec 3 12:12:11 2012
@@ -25,13 +25,23 @@ import org.apache.hadoop.yarn.api.record
public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
private List<ContainerId> containerToCleanup;
-
- public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
+ private Reason reason;
+
+ public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup,
Reason reason) {
super(ContainerManagerEventType.FINISH_CONTAINERS);
this.containerToCleanup = containersToCleanup;
+ this.reason = reason;
}
public List<ContainerId> getContainersToCleanup() {
return this.containerToCleanup;
}
+
+ public Reason getReason() {
+ return reason;
+ }
+
+ public static enum Reason {
+ ON_SHUTDOWN, BY_RESOURCEMANAGER
+ }
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1416484&r1=1416483&r2=1416484&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
Mon Dec 3 12:12:11 2012
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -61,14 +64,24 @@ public class NodeManager extends Composi
* Priority of the NodeManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ /**
+ * Extra duration to wait for containers to be killed on shutdown.
+ */
+ private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private LocalDirsHandlerService dirsHandler;
+ private Context context;
+ private AsyncDispatcher dispatcher;
+ private ContainerManagerImpl containerManager;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
+ private long waitForContainersOnShutdownMillis;
+
public NodeManager() {
super(NodeManager.class.getName());
}
@@ -115,7 +128,7 @@ public class NodeManager extends Composi
containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
}
- Context context = new NMContext(containerTokenSecretManager);
+ this.context = new NMContext(containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
@@ -131,7 +144,7 @@ public class NodeManager extends Composi
addService(del);
// NodeManager level dispatcher
- AsyncDispatcher dispatcher = new AsyncDispatcher();
+ this.dispatcher = new AsyncDispatcher();
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
@@ -144,7 +157,7 @@ public class NodeManager extends Composi
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
- ContainerManagerImpl containerManager =
+ containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
@@ -155,13 +168,20 @@ public class NodeManager extends Composi
dispatcher.register(ContainerManagerEventType.class, containerManager);
addService(dispatcher);
-
+
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
-
+
+ waitForContainersOnShutdownMillis =
+ conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+ YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
+ conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
+ YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
+ SHUTDOWN_CLEANUP_SLOP_MS;
+
super.init(conf);
// TODO add local dirs to del
}
@@ -178,9 +198,44 @@ public class NodeManager extends Composi
@Override
public void stop() {
+ cleanupContainers();
super.stop();
DefaultMetricsSystem.shutdown();
}
+
+ @SuppressWarnings("unchecked")
+ private void cleanupContainers() {
+ Map<ContainerId, Container> containers = context.getContainers();
+ if (containers.isEmpty()) {
+ return;
+ }
+ LOG.info("Containers still running on shutdown: " + containers.keySet());
+
+ List<ContainerId> containerIds = new
ArrayList<ContainerId>(containers.keySet());
+ dispatcher.getEventHandler().handle(
+ new CMgrCompletedContainersEvent(containerIds,
+ CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
+
+ LOG.info("Waiting for containers to be killed");
+
+ long waitStartTime = System.currentTimeMillis();
+ while (!containers.isEmpty() &&
+ System.currentTimeMillis() - waitStartTime <
waitForContainersOnShutdownMillis) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while sleeping on container kill", ex);
+ }
+ }
+
+ // All containers killed
+ if (containers.isEmpty()) {
+ LOG.info("All containers in DONE state");
+ } else {
+ LOG.info("Done waiting for containers to be killed. Still alive: " +
+ containers.keySet());
+ }
+ }
public static class NMContext implements Context {
@@ -282,6 +337,11 @@ public class NodeManager extends Composi
NodeManager createNewNodeManager() {
return new NodeManager();
}
+
+ // For testing
+ ContainerManagerImpl getContainerManager() {
+ return containerManager;
+ }
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1416484&r1=1416483&r2=1416484&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
Mon Dec 3 12:12:11 2012
@@ -363,7 +363,8 @@ public class NodeStatusUpdaterImpl exten
.getContainersToCleanupList();
if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
- new CMgrCompletedContainersEvent(containersToCleanup));
+ new CMgrCompletedContainersEvent(containersToCleanup,
+ CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1416484&r1=1416483&r2=1416484&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
Mon Dec 3 12:12:11 2012
@@ -23,6 +23,8 @@ import static org.apache.hadoop.yarn.ser
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -593,9 +595,16 @@ public class ContainerManagerImpl extend
(CMgrCompletedContainersEvent) event;
for (ContainerId container : containersFinishedEvent
.getContainersToCleanup()) {
+ String diagnostic = "";
+ if (containersFinishedEvent.getReason() ==
+ CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
+ diagnostic = "Container Killed on Shutdown";
+ } else if (containersFinishedEvent.getReason() ==
+ CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
+ diagnostic = "Container Killed by ResourceManager";
+ }
this.dispatcher.getEventHandler().handle(
- new ContainerKillEvent(container,
- "Container Killed by ResourceManager"));
+ new ContainerKillEvent(container, diagnostic));
}
break;
default:
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1416484&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
Mon Dec 3 12:12:11 2012
@@ -0,0 +1,92 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+
+/**
+ * This class allows a node manager to run without without communicating with a
+ * real RM.
+ */
+public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
+ static final Log LOG = LogFactory.getLog(MockNodeStatusUpdater.class);
+
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private ResourceTracker resourceTracker;
+
+ public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MockResourceTracker();
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+
+ private static class MockResourceTracker implements ResourceTracker {
+ private int heartBeatID;
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+ RegistrationResponse regResponse = recordFactory
+ .newRecordInstance(RegistrationResponse.class);
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setRegistrationResponse(regResponse);
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ NodeStatus nodeStatus = request.getNodeStatus();
+ LOG.info("Got heartbeat number " + heartBeatID);
+ nodeStatus.setResponseId(heartBeatID++);
+
+ HeartbeatResponse response = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(heartBeatID);
+
+ NodeHeartbeatResponse nhResponse = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+ nhResponse.setHeartbeatResponse(response);
+ return nhResponse;
+ }
+ }
+}
Propchange:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1416484&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
Mon Dec 3 12:12:11 2012
@@ -0,0 +1,222 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeManagerShutdown {
+ static final File basedir =
+ new File("target", TestNodeManagerShutdown.class.getName());
+ static final File tmpDir = new File(basedir, "tmpDir");
+ static final File logsDir = new File(basedir, "logs");
+ static final File remoteLogsDir = new File(basedir, "remotelogs");
+ static final File nmLocalDir = new File(basedir, "nm0");
+ static final File processStartFile = new File(tmpDir, "start_file.txt")
+ .getAbsoluteFile();
+
+ static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+ static final String user = "nobody";
+ private FileContext localFS;
+
+ @Before
+ public void setup() throws UnsupportedFileSystemException {
+ localFS = FileContext.getLocalFSFileContext();
+ tmpDir.mkdirs();
+ logsDir.mkdirs();
+ remoteLogsDir.mkdirs();
+ nmLocalDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ localFS.delete(new Path(basedir.getPath()), true);
+ }
+
+ @Test
+ public void testKillContainersOnShutdown() throws IOException {
+ NodeManager nm = getNodeManager();
+ nm.init(createNMConfig());
+ nm.start();
+
+ ContainerManagerImpl containerManager = nm.getContainerManager();
+ File scriptFile = createUnhaltingScriptFile();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // Construct the Container-id
+ ContainerId cId = createContainerId();
+ containerLaunchContext.setContainerId(cId);
+
+ containerLaunchContext.setUser(user);
+
+ URL localResourceUri =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource localResource =
+ recordFactory.newRecordInstance(LocalResource.class);
+ localResource.setResource(localResourceUri);
+ localResource.setSize(-1);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setType(LocalResourceType.FILE);
+ localResource.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, localResource);
+ containerLaunchContext.setLocalResources(localResources);
+ containerLaunchContext.setUser(containerLaunchContext.getUser());
+ List<String> commands = new ArrayList<String>();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
+ containerLaunchContext.setResource(recordFactory
+ .newRecordInstance(Resource.class));
+ containerLaunchContext.getResource().setMemory(1024);
+ StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(startRequest);
+
+ GetContainerStatusRequest request =
+ recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ request.setContainerId(cId);
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatus(request).getStatus();
+ Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
+
+ try {Thread.sleep(5000);} catch (InterruptedException ex)
{ex.printStackTrace();}
+
+ nm.stop();
+
+ // Now verify the contents of the file
+ // Script generates a message when it receives a sigterm
+ // so we look for that
+ BufferedReader reader =
+ new BufferedReader(new FileReader(processStartFile));
+
+ boolean foundSigTermMessage = false;
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line.contains("SIGTERM")) {
+ foundSigTermMessage = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+ reader.close();
+ }
+
+ private ContainerId createContainerId() {
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(0);
+ appId.setId(0);
+ ApplicationAttemptId appAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(appId);
+ appAttemptId.setAttemptId(1);
+ ContainerId containerId =
+ recordFactory.newRecordInstance(ContainerId.class);
+ containerId.setApplicationAttemptId(appAttemptId);
+ return containerId;
+ }
+
+ private YarnConfiguration createNMConfig() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+ conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+ return conf;
+ }
+
+ /**
+ * Creates a script to run a container that will run forever unless
+ * stopped by external means.
+ */
+ private File createUnhaltingScriptFile() throws IOException {
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
+ fileWriter.write("#!/bin/bash\n\n");
+ fileWriter.write("echo \"Running testscript for delayed kill\"\n");
+ fileWriter.write("hello=\"Got SIGTERM\"\n");
+ fileWriter.write("umask 0\n");
+ fileWriter.write("trap \"echo $hello >> " + processStartFile + "\"
SIGTERM\n");
+ fileWriter.write("echo \"Writing pid to start file\"\n");
+ fileWriter.write("echo $$ >> " + processStartFile + "\n");
+ fileWriter.write("while true; do\nsleep 1s;\ndone\n");
+
+ fileWriter.close();
+ return scriptFile;
+ }
+
+ private NodeManager getNodeManager() {
+ return new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics);
+ return myNodeStatusUpdater;
+ }
+ };
+ }
+}
Propchange:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
------------------------------------------------------------------------------
svn:eol-style = native