Author: tucu Date: Thu Jul 26 13:25:41 2012 New Revision: 1365982 URL: http://svn.apache.org/viewvc?rev=1365982&view=rev Log: Merge -r 1365978:1365979 from trunk to branch. FIXES: MAPREDUCE-4417
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/ - copied from r1365979, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java - copied unchanged from r1365979, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/ssl/TestEncryptedShuffle.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm - copied unchanged from r1365979, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/EncryptedShuffle.apt.vm Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Jul 26 13:25:41 2012 @@ -9,6 +9,8 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process command-line. (ahmed via tucu) + MAPREDUCE-4417. add support for encrypted shuffle (tucu) + IMPROVEMENTS MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jul 26 13:25:41 2012 @@ -473,5 +473,10 @@ <!-- The above 2 fields are accessed locally and only via methods that are synchronized. --> - + + <Match> + <Class name="org.apache.hadoop.mapred.ShuffleHandler" /> + <Field name="sslFileBufferSize" /> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> </FindBugsFilter> Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Jul 26 13:25:41 2012 @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; @@ -108,7 +110,8 @@ public abstract class TaskImpl implement private long scheduledTime; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + + protected boolean encryptedShuffle; protected Credentials credentials; protected Token<JobTokenIdentifier> jobToken; @@ -274,6 +277,8 @@ public abstract class TaskImpl implement this.jobToken = jobToken; this.metrics = metrics; this.appContext = appContext; + this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); // See if this is from a previous generation. if (completedTasksFromPreviousRun != null @@ -637,9 +642,10 @@ public abstract class TaskImpl implement TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); - tce.setMapOutputServerAddress("http://" - + attempt.getNodeHttpAddress().split(":")[0] + ":" - + attempt.getShufflePort()); + String scheme = (encryptedShuffle) ? "https://" : "http://"; + tce.setMapOutputServerAddress(scheme + + attempt.getNodeHttpAddress().split(":")[0] + ":" + + attempt.getShufflePort()); tce.setStatus(status); tce.setAttemptId(attempt.getID()); int runTime = 0; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Jul 26 13:25:41 2012 @@ -79,4 +79,9 @@ public interface MRConfig { public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; public static final String MAX_BLOCK_LOCATIONS_KEY = "mapreduce.job.max.split.locations"; + + public static final String SHUFFLE_SSL_ENABLED_KEY = + "mapreduce.shuffle.ssl.enabled"; + + public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 26 13:25:41 2012 @@ -25,11 +25,13 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.HttpURLConnection; import java.net.URLConnection; +import java.security.GeneralSecurityException; import java.util.HashSet; import java.util.List; import java.util.Set; import javax.crypto.SecretKey; +import javax.net.ssl.HttpsURLConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,9 +44,11 @@ import org.apache.hadoop.mapred.Counters import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -92,6 +96,9 @@ class Fetcher<K,V> extends Thread { private volatile boolean stopped = false; + private static boolean sslShuffle; + private static SSLFactory sslFactory; + public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, @@ -135,6 +142,20 @@ class Fetcher<K,V> extends Thread { setName("fetcher#" + id); setDaemon(true); + + synchronized (Fetcher.class) { + sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); + if (sslShuffle && sslFactory == null) { + sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job); + try { + sslFactory.init(); + } catch (Exception ex) { + sslFactory.destroy(); + throw new RuntimeException(ex); + } + } + } } public void run() { @@ -173,8 +194,25 @@ class Fetcher<K,V> extends Thread { } catch (InterruptedException ie) { LOG.warn("Got interrupt while joining " + getName(), ie); } + if (sslFactory != null) { + sslFactory.destroy(); + } } + protected HttpURLConnection openConnection(URL url) throws IOException { + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + if (sslShuffle) { + HttpsURLConnection httpsConn = (HttpsURLConnection) conn; + try { + httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory()); + } catch (GeneralSecurityException ex) { + throw new IOException(ex); + } + httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); + } + return conn; + } + /** * The crux of the matter... * @@ -205,7 +243,7 @@ class Fetcher<K,V> extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + HttpURLConnection connection = openConnection(url); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jul 26 13:25:41 2012 @@ -513,6 +513,21 @@ </property> <property> + <name>mapreduce.shuffle.ssl.enabled</name> + <value>false</value> + <description> + Whether to use SSL for for the Shuffle HTTP endpoints. + </description> +</property> + +<property> + <name>mapreduce.shuffle.ssl.file.buffer.size</name> + <value>65536</value> + <description>Buffer size for reading spills from file when using SSL. + </description> +</property> + +<property> <name>mapreduce.reduce.markreset.buffer.percent</name> <value>0.0</value> <description>The percentage of memory -relative to the maximum heap size- to Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Jul 26 13:25:41 2012 @@ -55,7 +55,9 @@ import org.apache.hadoop.fs.LocalDirAllo import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -101,6 +103,8 @@ import org.jboss.netty.handler.codec.htt import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; @@ -114,6 +118,8 @@ public class ShuffleHandler extends Abst private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); + private HttpPipelineFactory pipelineFact; + private int sslFileBufferSize; public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -126,6 +132,11 @@ public class ShuffleHandler extends Abst public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 8080; + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = + "mapreduce.shuffle.ssl.file.buffer.size"; + + public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -249,7 +260,11 @@ public class ShuffleHandler extends Abst public synchronized void start() { Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); - HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf); + try { + pipelineFact = new HttpPipelineFactory(conf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } bootstrap.setPipelineFactory(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); Channel ch = bootstrap.bind(new InetSocketAddress(port)); @@ -259,6 +274,9 @@ public class ShuffleHandler extends Abst pipelineFact.SHUFFLE.setPort(port); LOG.info(getName() + " listening on port " + port); super.start(); + + sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, + DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); } @Override @@ -266,6 +284,7 @@ public class ShuffleHandler extends Abst accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); ServerBootstrap bootstrap = new ServerBootstrap(selector); bootstrap.releaseExternalResources(); + pipelineFact.destroy(); super.stop(); } @@ -283,22 +302,38 @@ public class ShuffleHandler extends Abst class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; + private SSLFactory sslFactory; - public HttpPipelineFactory(Configuration conf) { + public HttpPipelineFactory(Configuration conf) throws Exception { SHUFFLE = new Shuffle(conf); + if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { + sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + sslFactory.init(); + } + } + + public void destroy() { + if (sslFactory != null) { + sslFactory.destroy(); + } } @Override public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new HttpRequestDecoder(), - new HttpChunkAggregator(1 << 16), - new HttpResponseEncoder(), - new ChunkedWriteHandler(), - SHUFFLE); - // TODO factor security manager into pipeline - // TODO factor out encode/decode to permit binary shuffle - // TODO factor out decode of index to permit alt. models + ChannelPipeline pipeline = Channels.pipeline(); + if (sslFactory != null) { + pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); + } + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunking", new ChunkedWriteHandler()); + pipeline.addLast("shuffle", SHUFFLE); + return pipeline; + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models } } @@ -483,17 +518,25 @@ public class ShuffleHandler extends Abst LOG.info(spillfile + " not found"); return null; } - final FileRegion partition = new DefaultFileRegion( - spill.getChannel(), info.startOffset, info.partLength); - ChannelFuture writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - partition.releaseExternalResources(); - } - }); + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) == null) { + final FileRegion partition = new DefaultFileRegion( + spill.getChannel(), info.startOffset, info.partLength); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); + } else { + // HTTPS cannot be done with zero copy. + writeFuture = ch.write(new ChunkedFile(spill, info.startOffset, + info.partLength, + sslFileBufferSize)); + } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic return writeFuture; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm?rev=1365982&r1=1365981&r2=1365982&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm Thu Jul 26 13:25:41 2012 @@ -51,3 +51,5 @@ MapReduce NextGen aka YARN aka MRv2 * {{{./CLIMiniCluster.html}CLI MiniCluster}} + * {{{./EncryptedShuffle.html}Encrypted Shuffle}} +