Author: cdouglas
Date: Fri Mar 14 00:31:08 2014
New Revision: 1577392
URL: http://svn.apache.org/r1577392
Log:
YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1577392&r1=1577391&r2=1577392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Mar 14
00:31:08 2014
@@ -273,6 +273,9 @@ Release 2.4.0 - UNRELEASED
expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank
Bansal via zjshen)
+ YARN-1771. Reduce the number of NameNode operations during localization of
+ public resources using a cache. (Sangjin Lee via cdouglas)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1577392&r1=1577391&r2=1577392&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
Fri Mar 14 00:31:08 2014
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -43,6 +45,11 @@ import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Futures;
+
/**
* Download a single URL to the local disk.
*
@@ -56,6 +63,7 @@ public class FSDownload implements Calla
private final UserGroupInformation userUgi;
private Configuration conf;
private LocalResource resource;
+ private final LoadingCache<Path,Future<FileStatus>> statCache;
/** The local FS dir path under which this resource is to be localized to */
private Path destDirPath;
@@ -71,11 +79,18 @@ public class FSDownload implements Calla
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration
conf,
Path destDirPath, LocalResource resource) {
+ this(files, ugi, conf, destDirPath, resource, null);
+ }
+
+ public FSDownload(FileContext files, UserGroupInformation ugi, Configuration
conf,
+ Path destDirPath, LocalResource resource,
+ LoadingCache<Path,Future<FileStatus>> statCache) {
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
+ this.statCache = statCache;
}
LocalResource getResource() {
@@ -90,28 +105,43 @@ public class FSDownload implements Calla
}
/**
- * Returns a boolean to denote whether a cache file is visible to all(public)
+ * Creates the cache loader for the status loading cache. This should be used
+ * to create an instance of the status cache that is passed into the
+ * FSDownload constructor.
+ */
+ public static CacheLoader<Path,Future<FileStatus>>
+ createStatusCacheLoader(final Configuration conf) {
+ return new CacheLoader<Path,Future<FileStatus>>() {
+ public Future<FileStatus> load(Path path) {
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ return Futures.immediateFuture(fs.getFileStatus(path));
+ } catch (Throwable th) {
+ // report failures so it can be memoized
+ return Futures.immediateFailedFuture(th);
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns a boolean to denote whether a cache file is visible to all
(public)
* or not
- * @param conf
- * @param uri
- * @return true if the path in the uri is visible to all, false otherwise
- * @throws IOException
+ *
+ * @return true if the path in the current path is visible to all, false
+ * otherwise
*/
- private static boolean isPublic(FileSystem fs, Path current) throws
IOException {
+ @VisibleForTesting
+ static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
+ LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others
- if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE,
FsAction.READ)) {
+ if (!checkPublicPermsForAll(fs, sStat, FsAction.READ_EXECUTE,
FsAction.READ)) {
return false;
}
- return ancestorsHaveExecutePermissions(fs, current.getParent());
+ return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
}
- private static boolean checkPublicPermsForAll(FileSystem fs, Path current,
- FsAction dir, FsAction file)
- throws IOException {
- return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file);
- }
-
private static boolean checkPublicPermsForAll(FileSystem fs,
FileStatus status, FsAction dir, FsAction file)
throws IOException {
@@ -137,12 +167,13 @@ public class FSDownload implements Calla
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
*/
- private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path
path)
- throws IOException {
+ private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
+ Path path, LoadingCache<Path,Future<FileStatus>> statCache)
+ throws IOException {
Path current = path;
while (current != null) {
//the subdirs in the path should have execute permissions for others
- if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+ if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false;
}
current = current.getParent();
@@ -160,14 +191,46 @@ public class FSDownload implements Calla
* @throws IOException
*/
private static boolean checkPermissionOfOther(FileSystem fs, Path path,
- FsAction action) throws IOException {
- FileStatus status = fs.getFileStatus(path);
+ FsAction action, LoadingCache<Path,Future<FileStatus>> statCache)
+ throws IOException {
+ FileStatus status = getFileStatus(fs, path, statCache);
FsPermission perms = status.getPermission();
FsAction otherAction = perms.getOtherAction();
return otherAction.implies(action);
}
-
+ /**
+ * Obtains the file status, first by checking the stat cache if it is
+ * available, and then by getting it explicitly from the filesystem. If we
got
+ * the file status from the filesystem, it is added to the stat cache.
+ *
+ * The stat cache is expected to be managed by callers who provided it to
+ * FSDownload.
+ */
+ private static FileStatus getFileStatus(final FileSystem fs, final Path path,
+ LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
+ // if the stat cache does not exist, simply query the filesystem
+ if (statCache == null) {
+ return fs.getFileStatus(path);
+ }
+
+ try {
+ // get or load it from the cache
+ return statCache.get(path).get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ // the underlying exception should normally be IOException
+ if (cause instanceof IOException) {
+ throw (IOException)cause;
+ } else {
+ throw new IOException(cause);
+ }
+ } catch (InterruptedException e) { // should not happen
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
private Path copy(Path sCopy, Path dstdir) throws IOException {
FileSystem sourceFs = sCopy.getFileSystem(conf);
Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
@@ -178,14 +241,15 @@ public class FSDownload implements Calla
", was " + sStat.getModificationTime());
}
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
- if (!isPublic(sourceFs, sCopy)) {
+ if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
throw new IOException("Resource " + sCopy +
" is not publicly accessable and as such cannot be part of the" +
" public cache.");
}
}
-
- sourceFs.copyToLocalFile(sCopy, dCopy);
+
+ FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
+ true, conf);
return dCopy;
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1577392&r1=1577391&r2=1577392&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
Fri Mar 14 00:31:08 2014
@@ -21,27 +21,35 @@ package org.apache.hadoop.yarn.util;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
+import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
-import java.util.zip.GZIPOutputStream;
import junit.framework.Assert;
@@ -64,10 +72,13 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.AfterClass;
import org.junit.Test;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@@ -88,6 +99,18 @@ public class TestFSDownload {
static LocalResource createFile(FileContext files, Path p, int len,
Random r, LocalResourceVisibility vis) throws IOException {
+ createFile(files, p, len, r);
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
+ ret.setSize(len);
+ ret.setType(LocalResourceType.FILE);
+ ret.setVisibility(vis);
+ ret.setTimestamp(files.getFileStatus(p).getModificationTime());
+ return ret;
+ }
+
+ static void createFile(FileContext files, Path p, int len, Random r)
+ throws IOException {
FSDataOutputStream out = null;
try {
byte[] bytes = new byte[len];
@@ -97,13 +120,6 @@ public class TestFSDownload {
} finally {
if (out != null) out.close();
}
- LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
- ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
- ret.setSize(len);
- ret.setType(LocalResourceType.FILE);
- ret.setVisibility(vis);
- ret.setTimestamp(files.getFileStatus(p).getModificationTime());
- return ret;
}
static LocalResource createJar(FileContext files, Path p,
@@ -285,6 +301,76 @@ public class TestFSDownload {
}
}
+ @Test (timeout=60000)
+ public void testDownloadPublicWithStatCache() throws IOException,
+ URISyntaxException, InterruptedException, ExecutionException {
+ final Configuration conf = new Configuration();
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ int size = 512;
+
+ final ConcurrentMap<Path,AtomicInteger> counts =
+ new ConcurrentHashMap<Path,AtomicInteger>();
+ final CacheLoader<Path,Future<FileStatus>> loader =
+ FSDownload.createStatusCacheLoader(conf);
+ final LoadingCache<Path,Future<FileStatus>> statCache =
+ CacheBuilder.newBuilder().build(new
CacheLoader<Path,Future<FileStatus>>() {
+ public Future<FileStatus> load(Path path) throws Exception {
+ // increment the count
+ AtomicInteger count = counts.get(path);
+ if (count == null) {
+ count = new AtomicInteger(0);
+ AtomicInteger existing = counts.putIfAbsent(path, count);
+ if (existing != null) {
+ count = existing;
+ }
+ }
+ count.incrementAndGet();
+
+ // use the default loader
+ return loader.load(path);
+ }
+ });
+
+ // test FSDownload.isPublic() concurrently
+ final int fileCount = 3;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < fileCount; i++) {
+ Random rand = new Random();
+ long sharedSeed = rand.nextLong();
+ rand.setSeed(sharedSeed);
+ System.out.println("SEED: " + sharedSeed);
+ final Path path = new Path(basedir, "test-file-" + i);
+ createFile(files, path, size, rand);
+ final FileSystem fs = path.getFileSystem(conf);
+ final FileStatus sStat = fs.getFileStatus(path);
+ tasks.add(new Callable<Boolean>() {
+ public Boolean call() throws IOException {
+ return FSDownload.isPublic(fs, path, sStat, statCache);
+ }
+ });
+ }
+
+ ExecutorService exec = Executors.newFixedThreadPool(fileCount);
+ try {
+ List<Future<Boolean>> futures = exec.invokeAll(tasks);
+ // files should be public
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ // for each path exactly one file status call should be made
+ for (AtomicInteger count: counts.values()) {
+ assertSame(count.get(), 1);
+ }
+ } finally {
+ exec.shutdown();
+ }
+ }
+
@Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException,
InterruptedException {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java?rev=1577392&r1=1577391&r2=1577392&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
Fri Mar 14 00:31:08 2014
@@ -18,20 +18,34 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import com.google.common.cache.LoadingCache;
+
public class LocalizerContext {
private final String user;
private final ContainerId containerId;
private final Credentials credentials;
+ private final LoadingCache<Path,Future<FileStatus>> statCache;
public LocalizerContext(String user, ContainerId containerId,
Credentials credentials) {
+ this(user, containerId, credentials, null);
+ }
+
+ public LocalizerContext(String user, ContainerId containerId,
+ Credentials credentials,
+ LoadingCache<Path,Future<FileStatus>> statCache) {
this.user = user;
this.containerId = containerId;
this.credentials = credentials;
+ this.statCache = statCache;
}
public String getUser() {
@@ -46,4 +60,7 @@ public class LocalizerContext {
return credentials;
}
+ public LoadingCache<Path,Future<FileStatus>> getStatCache() {
+ return statCache;
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1577392&r1=1577391&r2=1577392&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Fri Mar 14 00:31:08 2014
@@ -83,8 +83,8 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import
org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import
org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.util.Conve
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
@@ -362,8 +364,11 @@ public class ResourceLocalizationService
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
+ // create a loading cache for the file statuses
+ LoadingCache<Path,Future<FileStatus>> statCache =
+
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
LocalizerContext ctxt = new LocalizerContext(
- c.getUser(), c.getContainerId(), c.getCredentials());
+ c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>>
e :
@@ -680,7 +685,8 @@ public class ResourceLocalizationService
// completing and being dequeued before pending updated
synchronized (pending) {
pending.put(queue.submit(new FSDownload(lfs, null, conf,
- publicDirDestPath, resource)), request);
+ publicDirDestPath, resource,
request.getContext().getStatCache())),
+ request);
}
} catch (IOException e) {
rsrc.unlock();