Author: vinodkv
Date: Wed Apr 3 05:01:35 2013
New Revision: 1463824
URL: http://svn.apache.org/r1463824
Log:
YARN-467. Modify public distributed cache to localize files such that no local
directory hits unix file count limits and thus prevent job failures.
Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1463823 ../../trunk/
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
- copied unchanged from r1463823,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
- copied unchanged from r1463823,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Apr 3
05:01:35 2013
@@ -120,7 +120,11 @@ Release 2.0.5-beta - UNRELEASED
YARN-382. SchedulerUtils improve way normalizeRequest sets the resource
capabilities. (Zhijie Shen via bikas)
-
+
+ YARN-467. Modify public distributed cache to localize files such that no
+ local directory hits unix file count limits and thus prevent job failures.
+ (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
Wed Apr 3 05:01:35 2013
@@ -256,4 +256,18 @@
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <!-- Null pointer exception needs to be ignored here as this is never going
to occur. -->
+ <Match>
+ <Class
name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl"
/>
+ <Method name="decrementFileCountForLocalCacheDirectory" />
+ <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
+ </Match>
+
+ <!-- Null pointer exception needs to be ignored here as this is never going
to occur. -->
+ <Match>
+ <Class
name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl"
/>
+ <Method name="getPathForLocalization" />
+ <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
+ </Match>
+
</FindBugsFilter>
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
Wed Apr 3 05:01:35 2013
@@ -340,7 +340,15 @@ public class YarnConfiguration extends C
/**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
-
+
+ /**
+ * Number of files in each localized directories
+ * Avoid tuning this too low.
+ */
+ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
+ NM_PREFIX + "local-cache.max-files-per-directory";
+ public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
8192;
+
/** Address where the localizer IPC is.*/
public static final String NM_LOCALIZER_ADDRESS =
NM_PREFIX + "localizer.address";
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
Wed Apr 3 05:01:35 2013
@@ -360,6 +360,25 @@
</property>
<property>
+ <description>It limits the maximum number of files which will be localized
+ in a single local directory. If the limit is reached then sub-directories
+ will be created and new files will be localized in them. If it is set to
+ a value less than or equal to 36 [which are sub-directories (0-9 and then
+ a-z)] then NodeManager will fail to start. For example; [for public
+ cache] if this is configured with a value of 40 ( 4 files +
+ 36 sub-directories) and the local-dir is "/tmp/local-dir1" then it will
+ allow 4 files to be created directly inside "/tmp/local-dir1/filecache".
+ For files that are localized further it will create a sub-directory "0"
+ inside "/tmp/local-dir1/filecache" and will localize files inside it
+ until it becomes full. If a file is removed from a sub-directory that
+ is marked full, then that sub-directory will be used back again to
+ localize files.
+ </description>
+ <name>yarn.nodemanager.local-cache.max-files-per-directory</name>
+ <value>8192</value>
+ </property>
+
+ <property>
<description>Address where the localizer IPC is.</description>
<name>yarn.nodemanager.localizer.address</name>
<value>0.0.0.0:8040</value>
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
Wed Apr 3 05:01:35 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -35,6 +36,11 @@ interface LocalResourcesTracker
boolean remove(LocalizedResource req, DeletionService delService);
+ Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
+
String getUser();
+ // TODO: Remove this in favour of EventHandler.handle
+ void localizationCompleted(LocalResourceRequest req, boolean success);
+
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
Wed Apr 3 05:01:35 2013
@@ -26,12 +26,13 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -49,17 +50,43 @@ class LocalResourcesTrackerImpl implemen
private final String user;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource>
localrsrc;
+ private Configuration conf;
+ /*
+ * This flag controls whether this resource tracker uses hierarchical
+ * directories or not. For PRIVATE and PUBLIC resource trackers it
+ * will be set whereas for APPLICATION resource tracker it would
+ * be false.
+ */
+ private final boolean useLocalCacheDirectoryManager;
+ private ConcurrentHashMap<Path, LocalCacheDirectoryManager>
directoryManagers;
+ /*
+ * It is used to keep track of resource into hierarchical directory
+ * while it is getting downloaded. It is useful for reference counting
+ * in case resource localization fails.
+ */
+ private ConcurrentHashMap<LocalResourceRequest, Path>
+ inProgressLocalResourcesMap;
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this(user, dispatcher,
- new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+ useLocalCacheDirectoryManager, conf);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+ ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
+ this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+ if ( this.useLocalCacheDirectoryManager) {
+ directoryManagers = new ConcurrentHashMap<Path,
LocalCacheDirectoryManager>();
+ inProgressLocalResourcesMap =
+ new ConcurrentHashMap<LocalResourceRequest, Path>();
+ }
+ this.conf = conf;
}
@Override
@@ -73,6 +100,7 @@ class LocalResourcesTrackerImpl implemen
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
rsrc = null;
}
if (null == rsrc) {
@@ -90,7 +118,52 @@ class LocalResourcesTrackerImpl implemen
rsrc.handle(event);
}
- /**
+ /*
+ * Update the file-count statistics for a local cache-directory.
+ * This will retrieve the localized path for the resource from
+ * 1) inProgressRsrcMap if the resource was under localization and it
+ * failed.
+ * 2) LocalizedResource if the resource is already localized.
+ * From this path it will identify the local directory under which the
+ * resource was localized. Then rest of the path will be used to decrement
+ * file count for the HierarchicalSubDirectory pointing to this relative
+ * path.
+ */
+ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest
req,
+ LocalizedResource rsrc) {
+ if ( useLocalCacheDirectoryManager) {
+ Path rsrcPath = null;
+ if (inProgressLocalResourcesMap.containsKey(req)) {
+ // This happens when localization of a resource fails.
+ rsrcPath = inProgressLocalResourcesMap.remove(req);
+ } else if (rsrc != null && rsrc.getLocalPath() != null) {
+ rsrcPath = rsrc.getLocalPath().getParent().getParent();
+ }
+ if (rsrcPath != null) {
+ Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+ while (!directoryManagers.containsKey(parentPath)) {
+ parentPath = parentPath.getParent();
+ if ( parentPath == null) {
+ return;
+ }
+ }
+ if ( parentPath != null) {
+ String parentDir = parentPath.toUri().getRawPath().toString();
+ LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+ String rsrcDir = rsrcPath.toUri().getRawPath();
+ if (rsrcDir.equals(parentDir)) {
+ dir.decrementFileCountForPath("");
+ } else {
+ dir.decrementFileCountForPath(
+ rsrcDir.substring(
+ parentDir.length() + 1));
+ }
+ }
+ }
+ }
+ }
+
+/**
* This module checks if the resource which was localized is already present
* or not
*
@@ -100,7 +173,8 @@ class LocalResourcesTrackerImpl implemen
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new
File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+ toString());
if (!file.exists()) {
ret = false;
}
@@ -133,11 +207,11 @@ class LocalResourcesTrackerImpl implemen
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
+ decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
return true;
}
}
-
/**
* Returns the path up to the random directory component.
*/
@@ -163,4 +237,50 @@ class LocalResourcesTrackerImpl implemen
public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator();
}
-}
+
+ /**
+ * @return {@link Path} absolute path for localization which includes local
+ * directory path and the relative hierarchical path (if use local
+ * cache directory manager is enabled)
+ *
+ * @param {@link LocalResourceRequest} Resource localization request to
+ * localize the resource.
+ * @param {@link Path} local directory path
+ */
+ @Override
+ public Path
+ getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+ if (!directoryManagers.containsKey(localDirPath)) {
+ directoryManagers.putIfAbsent(localDirPath,
+ new LocalCacheDirectoryManager(conf));
+ }
+ LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+ Path rPath = localDirPath;
+ String hierarchicalPath = dir.getRelativePathForLocalization();
+ // For most of the scenarios we will get root path only which
+ // is an empty string
+ if (!hierarchicalPath.isEmpty()) {
+ rPath = new Path(localDirPath, hierarchicalPath);
+ }
+ inProgressLocalResourcesMap.put(req, rPath);
+ return rPath;
+ } else {
+ return localDirPath;
+ }
+ }
+
+ @Override
+ public void localizationCompleted(LocalResourceRequest req,
+ boolean success) {
+ if (useLocalCacheDirectoryManager) {
+ if (!success) {
+ decrementFileCountForLocalCacheDirectory(req, null);
+ } else {
+ inProgressLocalResourcesMap.remove(req);
+ }
+ }
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Wed Apr 3 05:01:35 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -130,7 +131,7 @@ public class ResourceLocalizationService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
- private final LocalResourcesTracker publicRsrc;
+ private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
@@ -158,7 +159,6 @@ public class ResourceLocalizationService
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -173,8 +173,26 @@ public class ResourceLocalizationService
}
}
+ private void validateConf(Configuration conf) {
+ int perDirFileLimit =
+ conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+ if (perDirFileLimit <= 36) {
+ LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with very low value.");
+ throw new YarnException(
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with a value less than 37.");
+ } else {
+ LOG.info("per directory file limit = " + perDirFileLimit);
+ }
+ }
+
@Override
public void init(Configuration conf) {
+ this.validateConf(conf);
+ this.publicRsrc =
+ new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
@@ -212,6 +230,7 @@ public class ResourceLocalizationService
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -306,15 +325,17 @@ public class ResourceLocalizationService
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
- privateRsrc.putIfAbsent(userName,
- new LocalResourcesTrackerImpl(userName, dispatcher));
- if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+ privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
+ dispatcher, false, super.getConfig()));
+ if (null != appRsrc.putIfAbsent(
+ ConverterUtils.toString(app.getAppId()),
+ new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
+ .getConfig()))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
+ // should appear in logs, but it's an internal error
+ // that should have no effect on applications
}
// 1) Signal container init
//
@@ -620,6 +641,13 @@ public class ResourceLocalizationService
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
+ Path hierarchicalPath =
+ publicRsrc.getPathForLocalization(key, publicDirDestPath);
+ if (!hierarchicalPath.equals(publicDirDestPath)) {
+ publicDirDestPath = hierarchicalPath;
+ DiskChecker.checkDir(
+ new File(publicDirDestPath.toUri().getPath()));
+ }
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
request);
@@ -654,19 +682,21 @@ public class ResourceLocalizationService
assoc.getResource().handle(
new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
+ publicRsrc.localizationCompleted(key, true);
synchronized (attempts) {
attempts.remove(key);
}
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
+ LocalResourceRequest req = assoc.getResource().getRequest();
dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(),
- assoc.getResource().getRequest(), e.getCause()));
+ req, e.getCause()));
+ publicRsrc.localizationCompleted(req, false);
List<LocalizerResourceRequestEvent> reqs;
synchronized (attempts) {
- LocalResourceRequest req = assoc.getResource().getRequest();
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -1003,4 +1033,4 @@ public class ResourceLocalizationService
del.delete(null, dirPath, new Path[] {});
}
-}
+}
\ No newline at end of file
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
Wed Apr 3 05:01:35 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nod
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl {
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void test() {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
- dispatcher = createDispatcher(new Configuration());
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
@@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerIm
localrsrc.put(req1, lr1);
localrsrc.put(req2, lr2);
LocalResourcesTracker tracker =
- new LocalResourcesTrackerImpl(user, dispatcher, localrsrc);
+ new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
+ conf);
ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerIm
}
}
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void testConsistency() {
String user = "testuser";
DrainDispatcher dispatcher = null;
try {
- dispatcher = createDispatcher(new Configuration());
+ Configuration conf = new Configuration();
+ dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
@@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerIm
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new
ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- dispatcher, localrsrc);
+ dispatcher, localrsrc, false, conf);
ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1);
@@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerIm
}
}
+ @Test(timeout = 100000)
+ @SuppressWarnings("unchecked")
+ public void testHierarchicalLocalCacheDirectories() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+ Configuration conf = new Configuration();
+ // setting per directory file limit to 1.
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+ dispatcher = createDispatcher(conf);
+
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+ DeletionService mockDelService = mock(DeletionService.class);
+
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc, true, conf);
+
+ // This is a random path. NO File creation will take place at this place.
+ Path localDir = new Path("/tmp");
+
+ // Container 1 needs lr1 resource
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ // Container 1 requests lr1 to be localized
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent1);
+
+ // Simulate the process of localization of lr1
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+ // Simulate lr1 getting localized
+ ResourceLocalizedEvent rle =
+ new ResourceLocalizedEvent(lr1,
+ new Path(hierarchicalPath1.toUri().toString() +
+ Path.SEPARATOR + "file1"), 120);
+ tracker.handle(rle);
+ // Localization successful.
+ tracker.localizationCompleted(lr1, true);
+
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
+ LocalResourceVisibility.PUBLIC);
+ Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+ // localization failed.
+ tracker.localizationCompleted(lr2, false);
+
+ /*
+ * The path returned for two localization should be different because we
+ * are limiting one file per sub-directory.
+ */
+ Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
+
+ LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.PUBLIC);
+ ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent3);
+ Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+ tracker.localizationCompleted(lr3, true);
+
+ // Verifying that path created is inside the subdirectory
+ Assert.assertEquals(hierarchicalPath3.toUri().toString(),
+ hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
+
+ // Container 1 releases resource lr1
+ ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
+ tracker.handle(relEvent1);
+
+ // Validate the file counts now
+ int resources = 0;
+ Iterator<LocalizedResource> iter = tracker.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ resources++;
+ }
+ // There should be only two resources lr1 and lr3 now.
+ Assert.assertEquals(2, resources);
+
+ // Now simulate cache cleanup - removes unused resources.
+ iter = tracker.iterator();
+ while (iter.hasNext()) {
+ LocalizedResource rsrc = iter.next();
+ if (rsrc.getRefCount() == 0) {
+ Assert.assertTrue(tracker.remove(rsrc, mockDelService));
+ resources--;
+ }
+ }
+ // lr1 is not used by anyone and will be removed, only lr3 will hang
+ // around
+ Assert.assertEquals(1, resources);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1463824&r1=1463823&r2=1463824&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
Wed Apr 3 05:01:35 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -76,10 +77,11 @@ public class TestResourceRetention {
LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
long nRsrcs, long timestamp, long tsstep) {
+ Configuration conf = new Configuration();
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
- trackerResources));
+ trackerResources, false, conf));
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,