Author: sseth
Date: Mon Mar 25 18:23:59 2013
New Revision: 1460808
URL: http://svn.apache.org/r1460808
Log:
YARN-71. Fix the NodeManager to clean up local-dirs on restart. Contributed by
Xuan Gong.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1460808&r1=1460807&r2=1460808&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Mar 25 18:23:59 2013
@@ -124,6 +124,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-470. Support a way to disable resource monitoring on the NodeManager.
(Siddharth Seth via hitesh)
+
+ YARN-71. Fix the NodeManager to clean up local-dirs on restart.
+ (Xuan Gong via sseth)
Release 2.0.4-alpha - UNRELEASED
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1460808&r1=1460807&r2=1460808&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
Mon Mar 25 18:23:59 2013
@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.Co
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.Records;
+import com.google.common.annotations.VisibleForTesting;
+
public class NodeManager extends CompositeService
implements EventHandler<NodeManagerEvent> {
@@ -113,6 +115,10 @@ public class NodeManager extends Composi
return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
}
+ protected DeletionService createDeletionService(ContainerExecutor exec) {
+ return new DeletionService(exec);
+ }
+
protected void doSecureLogin() throws IOException {
SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
YarnConfiguration.NM_PRINCIPAL);
@@ -143,7 +149,7 @@ public class NodeManager extends Composi
} catch (IOException e) {
throw new YarnException("Failed to initialize container executor", e);
}
- DeletionService del = new DeletionService(exec);
+ DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher
@@ -351,6 +357,11 @@ public class NodeManager extends Composi
return containerManager;
}
+ @VisibleForTesting
+ Context getNMContext() {
+ return this.context;
+ }
+
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new
YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1460808&r1=1460807&r2=1460808&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Mon Mar 25 18:23:59 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.Creat
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
@@ -53,8 +54,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
@@ -175,9 +178,11 @@ public class ResourceLocalizationService
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
- // TODO queue deletions here, rather than NM init?
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
+
+ cleanUpLocalDir(lfs,delService);
+
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
// $local/usercache
@@ -926,4 +931,76 @@ public class ResourceLocalizationService
}
+ private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
+ long currentTimeStamp = System.currentTimeMillis();
+ for (String localDir : dirsHandler.getLocalDirs()) {
+ renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
+ currentTimeStamp);
+ renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
+ currentTimeStamp);
+ renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
+ currentTimeStamp);
+ try {
+ deleteLocalDir(lfs, del, localDir);
+ } catch (IOException e) {
+ // Do nothing, just give the warning
+ LOG.warn("Failed to delete localDir: " + localDir);
+ }
+ }
+ }
+
+ private void renameLocalDir(FileContext lfs, String localDir,
+ String localSubDir, long currentTimeStamp) {
+ try {
+ lfs.rename(new Path(localDir, localSubDir), new Path(
+ localDir, localSubDir + "_DEL_" + currentTimeStamp));
+ } catch (FileNotFoundException ex) {
+ // No need to handle this exception
+ // localSubDir may not be exist
+ } catch (Exception ex) {
+ // Do nothing, just give the warning
+ LOG.warn("Failed to rename the local file under " +
+ localDir + "/" + localSubDir);
+ }
+ }
+
+ private void deleteLocalDir(FileContext lfs, DeletionService del,
+ String localDir) throws IOException {
+ RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
+ if (fileStatus != null) {
+ while (fileStatus.hasNext()) {
+ FileStatus status = fileStatus.next();
+ try {
+ if (status.getPath().getName().matches(".*" +
+ ContainerLocalizer.USERCACHE + "_DEL_.*")) {
+ cleanUpFilesFromSubDir(lfs, del, status.getPath());
+ } else if (status.getPath().getName()
+ .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
+ ||
+ status.getPath().getName()
+ .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
+ del.delete(null, status.getPath(), new Path[] {});
+ }
+ } catch (IOException ex) {
+ // Do nothing, just give the warning
+ LOG.warn("Failed to delete this local Directory: " +
+ status.getPath().getName());
+ }
+ }
+ }
+ }
+
+ private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
+ Path dirPath) throws IOException {
+ RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
+ if (fileStatus != null) {
+ while (fileStatus.hasNext()) {
+ FileStatus status = fileStatus.next();
+ String owner = status.getOwner();
+ del.delete(owner, status.getPath(), new Path[] {});
+ }
+ }
+ del.delete(null, dirPath, new Path[] {});
+ }
+
}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1460808&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
Mon Mar 25 18:23:59 2013
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestNodeManagerReboot {
+
+ static final File basedir =
+ new File("target", TestNodeManagerReboot.class.getName());
+ static final File logsDir = new File(basedir, "logs");
+ static final File nmLocalDir = new File(basedir, "nm0");
+ static final File localResourceDir = new File(basedir, "resource");
+
+ static final String user = System.getProperty("user.name");
+ private FileContext localFS;
+
+ private MyNodeManager nm;
+ private DeletionService delService;
+ static final Log LOG = LogFactory.getLog(TestNodeManagerReboot.class);
+
+ @Before
+ public void setup() throws UnsupportedFileSystemException {
+ localFS = FileContext.getLocalFSFileContext();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ localFS.delete(new Path(basedir.getPath()), true);
+ if (nm != null) {
+ nm.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testClearLocalDirWhenNodeReboot() throws IOException {
+ nm = new MyNodeManager();
+ nm.start();
+ // create files under fileCache
+ createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE,
100);
+ localResourceDir.mkdirs();
+ ContainerManagerImpl containerManager = nm.getContainerManager();
+
+ ContainerLaunchContext containerLaunchContext =
+ Records.newRecord(ContainerLaunchContext.class);
+ // Construct the Container-id
+ ContainerId cId = createContainerId();
+ containerLaunchContext.setContainerId(cId);
+
+ containerLaunchContext.setUser(user);
+
+ URL localResourceUri =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+
+ LocalResource localResource =
+ Records.newRecord(LocalResource.class);
+ localResource.setResource(localResourceUri);
+ localResource.setSize(-1);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setType(LocalResourceType.FILE);
+ localResource.setTimestamp(localResourceDir.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, localResource);
+ containerLaunchContext.setLocalResources(localResources);
+ containerLaunchContext.setUser(containerLaunchContext.getUser());
+ List<String> commands = new ArrayList<String>();
+ containerLaunchContext.setCommands(commands);
+ containerLaunchContext.setResource(Records
+ .newRecord(Resource.class));
+ containerLaunchContext.getResource().setMemory(1024);
+ StartContainerRequest startRequest =
+ Records.newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(startRequest);
+
+ GetContainerStatusRequest request =
+ Records.newRecord(GetContainerStatusRequest.class);
+ request.setContainerId(cId);
+ Container container =
+ nm.getNMContext().getContainers().get(request.getContainerId());
+
+ final int MAX_TRIES = 20;
+ int numTries = 0;
+ while (!container.getContainerState().equals(ContainerState.DONE)
+ && numTries <= MAX_TRIES) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // Do nothing
+ }
+ numTries++;
+ }
+
+ Assert.assertEquals(ContainerState.DONE, container.getContainerState());
+
+ Assert.assertTrue(
+ "The container should create a subDir named currentUser: " + user +
+ "under localDir/usercache",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE) > 0);
+
+ Assert.assertTrue("There should be files or Dirs under nm_private when " +
+ "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
+
+ nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+
+ numTries = 0;
+ while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
+ .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
+ .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
+ > 0) && numTries < MAX_TRIES) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // Do nothing
+ }
+ numTries++;
+ }
+
+ Assert.assertTrue("After NM reboots, all local files should be deleted",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
+ .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
+ .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
+ == 0);
+ verify(delService, times(1)).delete(eq(user),
+ argThat(new PathInclude(user)));
+ verify(delService, times(1)).delete(
+ (String) isNull(),
+ argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+ + "_DEL_")));
+ verify(delService, times(1)).delete((String) isNull(),
+ argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+ verify(delService, times(1)).delete((String) isNull(),
+ argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
+
+ }
+
+ private int numOfLocalDirs(String localDir, String localSubDir) {
+ File[] listOfFiles = new File(localDir, localSubDir).listFiles();
+ if (listOfFiles == null) {
+ return 0;
+ } else {
+ return listOfFiles.length;
+ }
+ }
+
+ private void createFiles(String dir, String subDir, int numOfFiles) {
+ for (int i = 0; i < numOfFiles; i++) {
+ File newFile = new File(dir + "/" + subDir, "file_" + (i + 1));
+ try {
+ newFile.createNewFile();
+ } catch (IOException e) {
+ // Do nothing
+ }
+ }
+ }
+
+ private ContainerId createContainerId() {
+ ApplicationId appId = Records.newRecord(ApplicationId.class);
+ appId.setClusterTimestamp(0);
+ appId.setId(0);
+ ApplicationAttemptId appAttemptId =
+ Records.newRecord(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(appId);
+ appAttemptId.setAttemptId(1);
+ ContainerId containerId =
+ Records.newRecord(ContainerId.class);
+ containerId.setApplicationAttemptId(appAttemptId);
+ return containerId;
+ }
+
+ private class MyNodeManager extends NodeManager {
+
+ public MyNodeManager() {
+ super();
+ this.init(createNMConfig());
+ }
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics);
+ return myNodeStatusUpdater;
+ }
+
+ @Override
+ protected DeletionService createDeletionService(ContainerExecutor exec) {
+ delService = spy(new DeletionService(exec));
+ return delService;
+ }
+
+ // mimic part of reboot process
+ @Override
+ public void handle(NodeManagerEvent event) {
+ switch (event.getType()) {
+ case SHUTDOWN:
+ this.stop();
+ break;
+ case REBOOT:
+ this.stop();
+ this.createNewMyNodeManager().start();
+ break;
+ default:
+ LOG.warn("Invalid shutdown event " + event.getType() + ".
Ignoring.");
+ }
+ }
+
+ private MyNodeManager createNewMyNodeManager() {
+ return new MyNodeManager();
+ }
+
+ private YarnConfiguration createNMConfig() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
+ conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+ return conf;
+ }
+ }
+
+ class PathInclude extends ArgumentMatcher<Path> {
+
+ final String part;
+
+ PathInclude(String part) {
+ this.part = part;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ return ((Path) o).getName().indexOf(part) != -1;
+ }
+ }
+}