MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs. Contributed by Gera 
Shegalov


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7039b98e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7039b98e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7039b98e

Branch: refs/heads/HDFS-6581
Commit: 7039b98e1c459e9e0d8caa28cdaa2868e2bde2eb
Parents: 9721e2c
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Sep 22 15:20:59 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Sep 22 15:20:59 2014 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../org/apache/hadoop/mapred/YarnChild.java     | 63 +-------------------
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  1 +
 .../apache/hadoop/mapreduce/v2/util/MRApps.java | 61 +++++++++++++++++--
 .../apache/hadoop/mapreduce/v2/TestUberAM.java  |  7 ---
 5 files changed, 62 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index eb4c251..815b3fb 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -360,6 +360,9 @@ Release 2.6.0 - UNRELEASED
     ApplicationNotFoundException if the job rolled off the RM view (Sangjin
     Lee via jlowe)
 
+    MAPREDUCE-6095. Enable DistributedCache for uber-mode Jobs (Gera Shegalov
+    via jlowe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index 4ba1991..92bbc4a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -23,15 +23,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -43,7 +39,6 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -307,7 +302,7 @@ class YarnChild {
     task.localizeConfiguration(job);
 
     // Set up the DistributedCache related configs
-    setupDistributedCacheConfig(job);
+    MRApps.setupDistributedCacheLocal(job);
 
     // Overwrite the localized task jobconf which is linked to in the current
     // work-dir.
@@ -317,62 +312,6 @@ class YarnChild {
     task.setConf(job);
   }
 
-  /**
-   * Set up the DistributedCache related configs to make
-   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
-   * and
-   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
-   * working.
-   * @param job
-   * @throws IOException
-   */
-  private static void setupDistributedCacheConfig(final JobConf job)
-      throws IOException {
-
-    String localWorkDir = System.getenv("PWD");
-    //        ^ ^ all symlinks are created in the current work-dir
-
-    // Update the configuration object with localized archives.
-    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
-    if (cacheArchives != null) {
-      List<String> localArchives = new ArrayList<String>();
-      for (int i = 0; i < cacheArchives.length; ++i) {
-        URI u = cacheArchives[i];
-        Path p = new Path(u);
-        Path name =
-            new Path((null == u.getFragment()) ? p.getName()
-                : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localArchives.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
-            .arrayToString(localArchives.toArray(new String[localArchives
-                .size()])));
-      }
-    }
-
-    // Update the configuration object with localized files.
-    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
-    if (cacheFiles != null) {
-      List<String> localFiles = new ArrayList<String>();
-      for (int i = 0; i < cacheFiles.length; ++i) {
-        URI u = cacheFiles[i];
-        Path p = new Path(u);
-        Path name =
-            new Path((null == u.getFragment()) ? p.getName()
-                : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localFiles.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALFILES,
-            StringUtils.arrayToString(localFiles
-                .toArray(new String[localFiles.size()])));
-      }
-    }
-  }
-
   private static final FsPermission urw_gr =
     FsPermission.createImmutable((short) 0640);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 59e7249..1cf8b29 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -826,6 +826,7 @@ public class MRAppMaster extends CompositeService {
     @Override
     protected void serviceStart() throws Exception {
       if (job.isUber()) {
+        MRApps.setupDistributedCacheLocal(getConfig());
         this.containerAllocator = new LocalContainerAllocator(
             this.clientService, this.context, nmHost, nmPort, nmHttpPort
             , containerID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index 3bd8414..113b445 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -26,12 +26,11 @@ import java.net.URISyntaxException;
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -58,8 +57,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.util.ApplicationClassLoader;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -469,6 +466,62 @@ public class MRApps extends Apps {
         DistributedCache.getFileVisibilities(conf));
   }
 
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param conf
+   * @throws java.io.IOException
+   */
+  public static void setupDistributedCacheLocal(Configuration conf)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        conf.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
   private static String getResourceDescription(LocalResourceType type) {
     if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) 
{
       return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7039b98e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
index 32199e5..e89a919 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
@@ -191,11 +191,4 @@ public class TestUberAM extends TestMRJobs {
   throws IOException, InterruptedException, ClassNotFoundException {
     super.testSleepJobWithSecurityOn();
   }
-
-  // Add a test for distcache when uber mode is enabled. TODO
-  @Override
-  @Test
-  public void testDistributedCache() throws Exception {
-    //
-  }
 }

Reply via email to