Author: vinodkv
Date: Tue Jul 16 23:31:29 2013
New Revision: 1503943
URL: http://svn.apache.org/r1503943
Log:
YARN-661. Fixed NM to cleanup users' local directories correctly when starting
up. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1503942 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1503943&r1=1503942&r2=1503943&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Jul 16
23:31:29 2013
@@ -45,6 +45,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-523. Modified a test-case to validate container diagnostics on
localization failures. (Jian He via vinodkv)
+ YARN-661. Fixed NM to cleanup users' local directories correctly when
+ starting up. (Omkar Vinit Joshi via vinodkv)
+
Release 2.1.0-beta - 2013-07-02
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1503943&r1=1503942&r2=1503943&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
Tue Jul 16 23:31:29 2013
@@ -18,23 +18,30 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DeletionService extends AbstractService {
@@ -42,7 +49,8 @@ public class DeletionService extends Abs
private int debugDelay;
private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched;
- private final FileContext lfs = getLfs();
+ private static final FileContext lfs = getLfs();
+
static final FileContext getLfs() {
try {
return FileContext.getLocalFSFileContext();
@@ -68,11 +76,23 @@ public class DeletionService extends Abs
public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) {
- sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay,
- TimeUnit.SECONDS);
+ if (baseDirs == null || baseDirs.length == 0) {
+ sched.schedule(new FileDeletionTask(this, user, subDir, null),
+ debugDelay, TimeUnit.SECONDS);
+ } else {
+ sched.schedule(
+ new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
+ debugDelay, TimeUnit.SECONDS);
+ }
}
}
-
+
+ public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
+ if (debugDelay != -1) {
+ sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
+ }
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder()
@@ -118,46 +138,184 @@ public class DeletionService extends Abs
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
- private class FileDeletion implements Runnable {
- final String user;
- final Path subDir;
- final Path[] baseDirs;
- FileDeletion(String user, Path subDir, Path[] baseDirs) {
+ public static class FileDeletionTask implements Runnable {
+ private final String user;
+ private final Path subDir;
+ private final List<Path> baseDirs;
+ private final AtomicInteger numberOfPendingPredecessorTasks;
+ private final Set<FileDeletionTask> successorTaskSet;
+ private final DeletionService delService;
+ // By default all tasks will start as success=true; however if any of
+ // the dependent task fails then it will be marked as false in
+ // fileDeletionTaskFinished().
+ private boolean success;
+
+ private FileDeletionTask(DeletionService delService, String user,
+ Path subDir, List<Path> baseDirs) {
+ this.delService = delService;
this.user = user;
this.subDir = subDir;
this.baseDirs = baseDirs;
+ this.successorTaskSet = new HashSet<FileDeletionTask>();
+ this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
+ success = true;
+ }
+
+ /**
+ * increments and returns pending predecessor task count
+ */
+ public int incrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.incrementAndGet();
+ }
+
+ /**
+ * decrements and returns pending predecessor task count
+ */
+ public int decrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.decrementAndGet();
}
+
+ @VisibleForTesting
+ public String getUser() {
+ return this.user;
+ }
+
+ @VisibleForTesting
+ public Path getSubDir() {
+ return this.subDir;
+ }
+
+ @VisibleForTesting
+ public List<Path> getBaseDirs() {
+ return this.baseDirs;
+ }
+
+ public synchronized void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public synchronized boolean getSucess() {
+ return this.success;
+ }
+
@Override
public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this);
+ }
+ boolean error = false;
if (null == user) {
- if (baseDirs == null || baseDirs.length == 0) {
+ if (baseDirs == null || baseDirs.size() == 0) {
LOG.debug("NM deleting absolute path : " + subDir);
try {
lfs.delete(subDir, true);
} catch (IOException e) {
+ error = true;
LOG.warn("Failed to delete " + subDir);
}
- return;
- }
- for (Path baseDir : baseDirs) {
- Path del = subDir == null? baseDir : new Path(baseDir, subDir);
- LOG.debug("NM deleting path : " + del);
- try {
- lfs.delete(del, true);
- } catch (IOException e) {
- LOG.warn("Failed to delete " + subDir);
+ } else {
+ for (Path baseDir : baseDirs) {
+ Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+ LOG.debug("NM deleting path : " + del);
+ try {
+ lfs.delete(del, true);
+ } catch (IOException e) {
+ error = true;
+ LOG.warn("Failed to delete " + subDir);
+ }
}
}
} else {
try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
- exec.deleteAsUser(user, subDir, baseDirs);
+ if (baseDirs == null || baseDirs.size() == 0) {
+ delService.exec.deleteAsUser(user, subDir, (Path[])null);
+ } else {
+ delService.exec.deleteAsUser(user, subDir,
+ baseDirs.toArray(new Path[0]));
+ }
} catch (IOException e) {
+ error = true;
LOG.warn("Failed to delete as user " + user, e);
} catch (InterruptedException e) {
+ error = true;
LOG.warn("Failed to delete as user " + user, e);
}
}
+ if (error) {
+ setSuccess(!error);
+ }
+ fileDeletionTaskFinished();
}
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
+ sb.append(" user : ").append(this.user);
+ sb.append(" subDir : ").append(
+ subDir == null ? "null" : subDir.toString());
+ sb.append(" baseDir : ");
+ if (baseDirs == null || baseDirs.size() == 0) {
+ sb.append("null");
+ } else {
+ for (Path baseDir : baseDirs) {
+ sb.append(baseDir.toString()).append(',');
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * If there is a task dependency between say tasks 1,2,3 such that
+ * task2 and task3 can be started only after task1 then we should define
+ * task2 and task3 as successor tasks for task1.
+ * Note:- Task dependency should be defined prior to
+ * @param successorTask
+ */
+ public synchronized void addFileDeletionTaskDependency(
+ FileDeletionTask successorTask) {
+ if (successorTaskSet.add(successorTask)) {
+ successorTask.incrementAndGetPendingPredecessorTasks();
+ }
+ }
+
+ /*
+ * This is called when
+ * 1) Current file deletion task ran and finished.
+ * 2) This can be even directly called by predecessor task if one of the
+ * dependent tasks of it has failed marking its success = false.
+ */
+ private synchronized void fileDeletionTaskFinished() {
+ Iterator<FileDeletionTask> successorTaskI =
+ this.successorTaskSet.iterator();
+ while (successorTaskI.hasNext()) {
+ FileDeletionTask successorTask = successorTaskI.next();
+ if (!success) {
+ successorTask.setSuccess(success);
+ }
+ int count = successorTask.decrementAndGetPendingPredecessorTasks();
+ if (count == 0) {
+ if (successorTask.getSucess()) {
+ successorTask.delService.scheduleFileDeletionTask(successorTask);
+ } else {
+ successorTask.fileDeletionTaskFinished();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method to create file deletion task. To be used only if we need
+ * a way to define dependencies between deletion tasks.
+ * @param user user on whose behalf this task is suppose to run
+ * @param subDir sub directory as required in
+ * {@link DeletionService#delete(String, Path, Path...)}
+ * @param baseDirs base directories as required in
+ * {@link DeletionService#delete(String, Path, Path...)}
+ */
+ public FileDeletionTask createFileDeletionTask(String user, Path subDir,
+ Path[] baseDirs) {
+ return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
-}
+}
\ No newline at end of file
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1503943&r1=1503942&r2=1503943&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Tue Jul 16 23:31:29 2013
@@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import
org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -1094,7 +1095,8 @@ public class ResourceLocalizationService
try {
if (status.getPath().getName().matches(".*" +
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
- cleanUpFilesFromSubDir(lfs, del, status.getPath());
+ LOG.info("usercache path : " + status.getPath().toString());
+ cleanUpFilesPerUserDir(lfs, del, status.getPath());
} else if (status.getPath().getName()
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
||
@@ -1111,17 +1113,28 @@ public class ResourceLocalizationService
}
}
- private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
- Path dirPath) throws IOException {
- RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
- if (fileStatus != null) {
- while (fileStatus.hasNext()) {
- FileStatus status = fileStatus.next();
+ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
+ Path userDirPath) throws IOException {
+ RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
+ FileDeletionTask dependentDeletionTask =
+ del.createFileDeletionTask(null, userDirPath, new Path[] {});
+ if (userDirStatus != null) {
+ List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+ while (userDirStatus.hasNext()) {
+ FileStatus status = userDirStatus.next();
String owner = status.getOwner();
- del.delete(owner, status.getPath(), new Path[] {});
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(owner, null,
+ new Path[] { status.getPath() });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
}
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
+ }
+ } else {
+ del.scheduleFileDeletionTask(dependentDeletionTask);
}
- del.delete(null, dirPath, new Path[] {});
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1503943&r1=1503942&r2=1503943&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
Tue Jul 16 23:31:29 2013
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-
+import
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.junit.Assert.*;
-
public class TestDeletionService {
private static final FileContext lfs = getLfs();
@@ -210,4 +210,79 @@ public class TestDeletionService {
}
assertTrue(del.isTerminated());
}
+
+ @Test (timeout=60000)
+ public void testFileDeletionTaskDependency() throws Exception {
+ FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+ Configuration conf = new Configuration();
+ exec.setConf(conf);
+ DeletionService del = new DeletionService(exec);
+ del.init(conf);
+ del.start();
+
+ try {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ List<Path> dirs = buildDirs(r, base, 2);
+ createDirs(new Path("."), dirs);
+
+ // first we will try to delete sub directories which are present. This
+ // should then trigger parent directory to be deleted.
+ List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
+
+ FileDeletionTask dependentDeletionTask =
+ del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+ List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+ for (Path subDir : subDirs) {
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(null, null, new Path[] { subDir });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
+ }
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
+ }
+
+ int msecToWait = 20 * 1000;
+ while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
+ Thread.sleep(100);
+ msecToWait -= 100;
+ }
+ assertFalse(lfs.util().exists(dirs.get(0)));
+
+
+ // Now we will try to delete sub directories; one of the deletion task we
+ // will mark as failure and then parent directory should not be deleted.
+ subDirs = buildDirs(r, dirs.get(1), 2);
+ subDirs.add(new Path(dirs.get(1), "absentFile"));
+
+ dependentDeletionTask =
+ del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+ deletionTasks = new ArrayList<FileDeletionTask>();
+ for (Path subDir : subDirs) {
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(null, null, new Path[] { subDir });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
+ }
+ // marking one of the tasks as a failure.
+ deletionTasks.get(2).setSuccess(false);
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
+ }
+
+ msecToWait = 20 * 1000;
+ while (msecToWait > 0
+ && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
+ subDirs.get(1)))) {
+ Thread.sleep(100);
+ msecToWait -= 100;
+ }
+ assertTrue(lfs.util().exists(dirs.get(1)));
+ } finally {
+ del.stop();
+ }
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1503943&r1=1503942&r2=1503943&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
Tue Jul 16 23:31:29 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -69,8 +70,8 @@ import org.mockito.ArgumentMatcher;
public class TestNodeManagerReboot {
- static final File basedir =
- new File("target", TestNodeManagerReboot.class.getName());
+ static final File basedir = new File("target",
+ TestNodeManagerReboot.class.getName());
static final File logsDir = new File(basedir, "logs");
static final File nmLocalDir = new File(basedir, "nm0");
static final File localResourceDir = new File(basedir, "resource");
@@ -100,7 +101,8 @@ public class TestNodeManagerReboot {
nm = new MyNodeManager();
nm.start();
- final ContainerManagementProtocol containerManager =
nm.getContainerManager();
+ final ContainerManagementProtocol containerManager =
+ nm.getContainerManager();
// create files under fileCache
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE,
100);
@@ -112,16 +114,13 @@ public class TestNodeManagerReboot {
ContainerId cId = createContainerId();
URL localResourceUri =
- ConverterUtils.getYarnUrlFromPath(localFS
- .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+ ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
+ localResourceDir.getAbsolutePath())));
LocalResource localResource =
- Records.newRecord(LocalResource.class);
- localResource.setResource(localResourceUri);
- localResource.setSize(-1);
- localResource.setVisibility(LocalResourceVisibility.APPLICATION);
- localResource.setType(LocalResourceType.FILE);
- localResource.setTimestamp(localResourceDir.lastModified());
+ LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION, -1,
+ localResourceDir.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
@@ -129,7 +128,7 @@ public class TestNodeManagerReboot {
containerLaunchContext.setLocalResources(localResources);
List<String> commands = new ArrayList<String>();
containerLaunchContext.setCommands(commands);
-
+
final StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
@@ -137,8 +136,9 @@ public class TestNodeManagerReboot {
startRequest.setContainerToken(TestContainerManager.createContainerToken(
cId, 0, nodeId, destinationFile, nm.getNMContext()
.getContainerTokenSecretManager()));
- final UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(cId.getApplicationAttemptId().toString());
+ final UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
+ .toString());
NMTokenIdentifier nmIdentifier =
new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user,
123);
currentUser.addTokenIdentifier(nmIdentifier);
@@ -170,27 +170,31 @@ public class TestNodeManagerReboot {
Assert.assertEquals(ContainerState.DONE, container.getContainerState());
- Assert.assertTrue(
- "The container should create a subDir named currentUser: " + user +
- "under localDir/usercache",
+ Assert
+ .assertTrue(
+ "The container should create a subDir named currentUser: " + user
+ + "under localDir/usercache",
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.USERCACHE) > 0);
+ ContainerLocalizer.USERCACHE) > 0);
- Assert.assertTrue("There should be files or Dirs under nm_private when " +
- "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ Assert.assertTrue(
+ "There should be files or Dirs under nm_private when "
+ + "container is launched",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
// restart the NodeManager
nm.stop();
nm = new MyNodeManager();
- nm.start();
+ nm.start();
numTries = 0;
- while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
- .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
- .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
- > 0) && numTries < MAX_TRIES) {
+ while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE) > 0
+ || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
+ nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
+ && numTries < MAX_TRIES) {
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
@@ -199,21 +203,27 @@ public class TestNodeManagerReboot {
numTries++;
}
- Assert.assertTrue("After NM reboots, all local files should be deleted",
- numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
- .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
- .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
- == 0);
+ Assert
+ .assertTrue(
+ "After NM reboots, all local files should be deleted",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE) == 0
+ && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) == 0
+ && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
verify(delService, times(1)).delete(
- (String) isNull(),
- argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
- + "_DEL_")));
+ (String) isNull(),
+ argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+ + "_DEL_")));
verify(delService, times(1)).delete((String) isNull(),
- argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
- verify(delService, times(1)).delete((String) isNull(),
- argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
-
+ argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+ verify(delService, times(1)).scheduleFileDeletionTask(
+ argThat(new FileDeletionInclude(user, null,
+ new String[] { destinationFile })));
+ verify(delService, times(1)).scheduleFileDeletionTask(
+ argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
+ + "_DEL_", new String[] {})));
}
private int numOfLocalDirs(String localDir, String localSubDir) {
@@ -238,7 +248,8 @@ public class TestNodeManagerReboot {
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
- ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId;
}
@@ -253,8 +264,8 @@ public class TestNodeManagerReboot {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
- context, dispatcher, healthChecker, metrics);
+ MockNodeStatusUpdater myNodeStatusUpdater =
+ new MockNodeStatusUpdater(context, dispatcher, healthChecker,
metrics);
return myNodeStatusUpdater;
}
@@ -288,4 +299,58 @@ public class TestNodeManagerReboot {
return ((Path) o).getName().indexOf(part) != -1;
}
}
+
+ class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
+ final String user;
+ final String subDirIncludes;
+ final String[] baseDirIncludes;
+
+ public FileDeletionInclude(String user, String subDirIncludes,
+ String [] baseDirIncludes) {
+ this.user = user;
+ this.subDirIncludes = subDirIncludes;
+ this.baseDirIncludes = baseDirIncludes;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ FileDeletionTask fd = (FileDeletionTask)o;
+ if (fd.getUser() == null && user != null) {
+ return false;
+ } else if (fd.getUser() != null && user == null) {
+ return false;
+ } else if (fd.getUser() != null && user != null) {
+ return fd.getUser().equals(user);
+ }
+ if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
+ return false;
+ }
+ if (baseDirIncludes == null && fd.getBaseDirs() != null) {
+ return false;
+ } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
+ return false;
+ } else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
+ if (baseDirIncludes.length != fd.getBaseDirs().size()) {
+ return false;
+ }
+ for (int i =0 ; i < baseDirIncludes.length; i++) {
+ if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public boolean comparePaths(Path p1, String p2) {
+ if (p1 == null && p2 != null){
+ return false;
+ } else if (p1 != null && p2 == null) {
+ return false;
+ } else if (p1 != null && p2 != null ){
+ return p1.toUri().getPath().contains(p2.toString());
+ }
+ return true;
+ }
+ }
}