Author: cnauroth Date: Mon Mar 24 19:03:09 2014 New Revision: 1580996 URL: http://svn.apache.org/r1580996 Log: MAPREDUCE-5791. Merging change r1580994 from trunk to branch-2.
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java - copied unchanged from r1580994, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1580996&r1=1580995&r2=1580996&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Mar 24 19:03:09 2014 @@ -117,6 +117,10 @@ Release 2.4.0 - UNRELEASED override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via vinodkv) + MAPREDUCE-5791. Shuffle phase is slow in Windows - + FadviseFileRegion::transferTo does not read disks efficiently. + (Nikola Vujic via cnauroth) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1580996&r1=1580995&r2=1580996&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Mar 24 19:03:09 2014 @@ -599,6 +599,30 @@ </property> <property> + <name>mapreduce.shuffle.transferTo.allowed</name> + <value></value> + <description>This option can enable/disable using nio transferTo method in + the shuffle phase. NIO transferTo does not perform well on windows in the + shuffle phase. Thus, with this configuration property it is possible to + disable it, in which case custom transfer method will be used. Recommended + value is false when running Hadoop on Windows. For Linux, it is recommended + to set it to true. If nothing is set then the default value is false for + Windows, and true for Linux. + </description> +</property> + +<property> + <name>mapreduce.shuffle.transfer.buffer.size</name> + <value>131072</value> + <description>This property is used only if + mapreduce.shuffle.transferTo.allowed is set to false. In that case, + this property defines the size of the buffer used in the buffer copy code + for the shuffle phase. The size of this buffer determines the size of the IO + requests. + </description> +</property> + +<property> <name>mapreduce.reduce.markreset.buffer.percent</name> <value>0.0</value> <description>The percentage of memory -relative to the maximum heap size- to Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1580996&r1=1580995&r2=1580996&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java Mon Mar 24 19:03:09 2014 @@ -21,6 +21,8 @@ package org.apache.hadoop.mapred; import java.io.FileDescriptor; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import org.apache.commons.logging.Log; @@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPoo import org.apache.hadoop.io.nativeio.NativeIO; import org.jboss.netty.channel.DefaultFileRegion; +import com.google.common.annotations.VisibleForTesting; + public class FadvisedFileRegion extends DefaultFileRegion { private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); @@ -39,18 +43,29 @@ public class FadvisedFileRegion extends private final ReadaheadPool readaheadPool; private final FileDescriptor fd; private final String identifier; - + private final long count; + private final long position; + private final int shuffleBufferSize; + private final boolean shuffleTransferToAllowed; + private final FileChannel fileChannel; + private ReadaheadRequest readaheadRequest; public FadvisedFileRegion(RandomAccessFile file, long position, long count, boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier) throws IOException { + String identifier, int shuffleBufferSize, + boolean shuffleTransferToAllowed) throws IOException { super(file.getChannel(), position, count); this.manageOsCache = manageOsCache; this.readaheadLength = readaheadLength; this.readaheadPool = readaheadPool; this.fd = file.getFD(); this.identifier = identifier; + this.fileChannel = file.getChannel(); + this.count = count; + this.position = position; + this.shuffleBufferSize = shuffleBufferSize; + this.shuffleTransferToAllowed = shuffleTransferToAllowed; } @Override @@ -61,9 +76,69 @@ public class FadvisedFileRegion extends getPosition() + position, readaheadLength, getPosition() + getCount(), readaheadRequest); } - return super.transferTo(target, position); + + if(this.shuffleTransferToAllowed) { + return super.transferTo(target, position); + } else { + return customShuffleTransfer(target, position); + } + } + + /** + * This method transfers data using local buffer. It transfers data from + * a disk to a local buffer in memory, and then it transfers data from the + * buffer to the target. This is used only if transferTo is disallowed in + * the configuration file. super.TransferTo does not perform well on Windows + * due to a small IO request generated. customShuffleTransfer can control + * the size of the IO requests by changing the size of the intermediate + * buffer. + */ + @VisibleForTesting + long customShuffleTransfer(WritableByteChannel target, long position) + throws IOException { + long actualCount = this.count - position; + if (actualCount < 0 || position < 0) { + throw new IllegalArgumentException( + "position out of range: " + position + + " (expected: 0 - " + (this.count - 1) + ')'); + } + if (actualCount == 0) { + return 0L; + } + + long trans = actualCount; + int readSize; + ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); + + while(trans > 0L && + (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { + //adjust counters and buffer limit + if(readSize < trans) { + trans -= readSize; + position += readSize; + byteBuffer.flip(); + } else { + //We can read more than we need if the actualCount is not multiple + //of the byteBuffer size and file is big enough. In that case we cannot + //use flip method but we need to set buffer limit manually to trans. + byteBuffer.limit((int)trans); + byteBuffer.position(0); + position += trans; + trans = 0; + } + + //write data to the target + while(byteBuffer.hasRemaining()) { + target.write(byteBuffer); + } + + byteBuffer.clear(); + } + + return actualCount - trans; } + @Override public void releaseExternalResources() { if (readaheadRequest != null) { Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1580996&r1=1580995&r2=1580996&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Mon Mar 24 19:03:09 2014 @@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.Mu import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; @@ -144,6 +145,8 @@ public class ShuffleHandler extends Auxi private boolean manageOsCache; private int readaheadLength; private int maxShuffleConnections; + private int shuffleBufferSize; + private boolean shuffleTransferToAllowed; private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); public static final String MAPREDUCE_SHUFFLE_SERVICEID = @@ -183,6 +186,17 @@ public class ShuffleHandler extends Auxi public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; // 0 implies Netty default of 2 * number of available processors public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; + + public static final String SHUFFLE_BUFFER_SIZE = + "mapreduce.shuffle.transfer.buffer.size"; + public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; + + public static final String SHUFFLE_TRANSFERTO_ALLOWED = + "mapreduce.shuffle.transferTo.allowed"; + public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true; + public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = + false; + boolean connectionKeepAliveEnabled = false; int connectionKeepAliveTimeOut; int mapOutputMetaInfoCacheSize; @@ -310,6 +324,13 @@ public class ShuffleHandler extends Auxi if (maxShuffleThreads == 0) { maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); } + + shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, + DEFAULT_SHUFFLE_BUFFER_SIZE); + + shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, + (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED: + DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED); ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") @@ -746,7 +767,8 @@ public class ShuffleHandler extends Auxi if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, - readaheadPool, spillfile.getAbsolutePath()); + readaheadPool, spillfile.getAbsolutePath(), + shuffleBufferSize, shuffleTransferToAllowed); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures,