Author: sseth Date: Sun Oct 13 20:17:53 2013 New Revision: 1531741 URL: http://svn.apache.org/r1531741 Log: MAPREDUCE-5329. Allow MR applications to use additional AuxServices, which are compatible with the default MapReduce shuffle. Contributed by Avner BenHanoch.
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1531741&r1=1531740&r2=1531741&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Oct 13 20:17:53 2013 @@ -198,6 +198,10 @@ Release 2.2.1 - UNRELEASED NEW FEATURES IMPROVEMENTS + + MAPREDUCE-5329. Allow MR applications to use additional AuxServices, + which are compatible with the default MapReduce shuffle. + (Avner BenHanoch via sseth) OPTIMIZATIONS Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1531741&r1=1531740&r2=1531741&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sun Oct 13 20:17:53 2013 @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -722,6 +723,32 @@ public abstract class TaskAttemptImpl im serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, ShuffleHandler.serializeServiceData(shuffleToken)); + // add external shuffle-providers - if any + Collection<String> shuffleProviders = conf.getStringCollection( + MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES); + if (! shuffleProviders.isEmpty()) { + Collection<String> auxNames = conf.getStringCollection( + YarnConfiguration.NM_AUX_SERVICES); + + for (final String shuffleProvider : shuffleProviders) { + if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) { + continue; // skip built-in shuffle-provider that was already inserted with shuffle secret key + } + if (auxNames.contains(shuffleProvider)) { + LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData"); + // This only serves for INIT_APP notifications + // The shuffle service needs to be able to work with the host:port information provided by the AM + // (i.e. shuffle services which require custom location / other configuration are not supported) + serviceData.put(shuffleProvider, ByteBuffer.allocate(0)); + } + else { + throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider + + " was NOT found in the list of aux-services that are available in this NM." + + " You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml"); + } + } + } + Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java?rev=1531741&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java Sun Oct 13 20:17:53 2013 @@ -0,0 +1,159 @@ +/** +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.impl; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.WrappedJvmID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; +import org.junit.Assert; + +public class TestShuffleProvider { + + @Test + public void testShuffleProviders() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + Path jobFile = mock(Path.class); + + EventHandler eventHandler = mock(EventHandler.class); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + + jobConf.set(YarnConfiguration.NM_AUX_SERVICES, + TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," + + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID); + + String serviceName = TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID; + String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName); + jobConf.set(serviceStr, TestShuffleHandler1.class.getName()); + + serviceName = TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID; + serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName); + jobConf.set(serviceStr, TestShuffleHandler2.class.getName()); + + jobConf.set(MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES, + TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + + "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID); + + Credentials credentials = new Credentials(); + Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>( + ("tokenid").getBytes(), ("tokenpw").getBytes(), + new Text("tokenkind"), new Text("tokenservice")); + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + mock(TaskSplitMetaInfo.class), jobConf, taListener, + jobToken, credentials, + new SystemClock(), null); + + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); + + ContainerLaunchContext launchCtx = + TaskAttemptImpl.createContainerLaunchContext(null, + jobConf, jobToken, taImpl.createRemoteTask(), + TypeConverter.fromYarn(jobId), + mock(WrappedJvmID.class), taListener, + credentials); + + Map<String, ByteBuffer> serviceDataMap = launchCtx.getServiceData(); + Assert.assertNotNull("TestShuffleHandler1 is missing", serviceDataMap.get(TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID)); + Assert.assertNotNull("TestShuffleHandler2 is missing", serviceDataMap.get(TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID)); + Assert.assertTrue("mismatch number of services in map", serviceDataMap.size() == 3); // 2 that we entered + 1 for the built-in shuffle-provider + } + + static public class StubbedFS extends RawLocalFileSystem { + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(1, false, 1, 1, 1, f); + } + } + + static public class TestShuffleHandler1 extends AuxiliaryService { + public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle1"; + public TestShuffleHandler1() { + super("testshuffle1"); + } + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + @Override + public synchronized ByteBuffer getMetaData() { + return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256 + } + } + + static public class TestShuffleHandler2 extends AuxiliaryService { + public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle2"; + public TestShuffleHandler2() { + super("testshuffle2"); + } + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + @Override + public synchronized ByteBuffer getMetaData() { + return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256 + } + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1531741&r1=1531740&r2=1531741&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sun Oct 13 20:17:53 2013 @@ -133,6 +133,13 @@ public interface MRJobConfig { public static final String MAPREDUCE_JOB_CLASSLOADER = "mapreduce.job.classloader"; + /** + * A comma-separated list of services that function as ShuffleProvider aux-services + * (in addition to the built-in ShuffleHandler). + * These services can serve shuffle requests from reducetasks. + */ + public static final String MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES = "mapreduce.job.shuffle.provider.services"; + public static final String MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES = "mapreduce.job.classloader.system.classes"; public static final String MAPREDUCE_JVM_SYSTEM_PROPERTIES_TO_LOG = "mapreduce.jvm.system-properties-to-log";