Author: sseth Date: Thu Aug 2 21:57:42 2012 New Revision: 1368722 URL: http://svn.apache.org/viewvc?rev=1368722&view=rev Log: Merge MAPREDUCE-3289 from trunk. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon and Siddharth Seth)
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java - copied unchanged from r1368718, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java - copied unchanged from r1368718, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1368722&r1=1368721&r2=1368722&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Aug 2 21:57:42 2012 @@ -68,6 +68,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease development of new applications. (Bikas Saha via acmurthy) + MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. + (Todd Lipcon and Siddharth Seth via sseth) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1368722&r1=1368721&r2=1368722&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Aug 2 21:57:42 2012 @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.LocalDirAllo import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.security.ssl.SSLFactory; @@ -86,9 +87,7 @@ import org.jboss.netty.channel.ChannelHa import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.FileRegion; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.group.ChannelGroup; @@ -104,7 +103,6 @@ import org.jboss.netty.handler.codec.htt import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; @@ -114,6 +112,12 @@ public class ShuffleHandler extends Abst implements AuxServices.AuxiliaryService { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); + + public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache"; + public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + + public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; + public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; private int port; private ChannelFactory selector; @@ -121,6 +125,15 @@ public class ShuffleHandler extends Abst private HttpPipelineFactory pipelineFact; private int sslFileBufferSize; + /** + * Should the shuffle use posix_fadvise calls to manage the OS cache during + * sendfile + */ + private boolean manageOsCache; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); + + public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -242,6 +255,12 @@ public class ShuffleHandler extends Abst @Override public synchronized void init(Configuration conf) { + manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, + DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + + readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, + DEFAULT_SHUFFLE_READAHEAD_BYTES); + ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -503,14 +522,14 @@ public class ShuffleHandler extends Abst base + "/file.out", conf); LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " + indexFileName); - IndexRecord info = + final IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - File spillfile = new File(mapOutputFileName.toString()); + final File spillfile = new File(mapOutputFileName.toString()); RandomAccessFile spill; try { spill = new RandomAccessFile(spillfile, "r"); @@ -520,22 +539,25 @@ public class ShuffleHandler extends Abst } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { - final FileRegion partition = new DefaultFileRegion( - spill.getChannel(), info.startOffset, info.partLength); + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + info.startOffset, info.partLength, manageOsCache, readaheadLength, + readaheadPool, spillfile.getAbsolutePath()); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - partition.releaseExternalResources(); - } - }); + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); } else { // HTTPS cannot be done with zero copy. - writeFuture = ch.write(new ChunkedFile(spill, info.startOffset, - info.partLength, - sslFileBufferSize)); + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + info.startOffset, info.partLength, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + spillfile.getAbsolutePath()); + writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic