Author: tucu Date: Tue Jan 22 14:10:42 2013 New Revision: 1436936 URL: http://svn.apache.org/viewvc?rev=1436936&view=rev Log: MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jan 22 14:10:42 2013 @@ -19,6 +19,8 @@ Trunk (Unreleased) MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions with poor implementations of Object#hashCode(). (Radim Kolar via cutting) + MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu) + IMPROVEMENTS MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue Jan 22 14:10:42 2013 @@ -268,7 +268,7 @@ This class is unlikely to get subclassed, so ignore --> <Match> - <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManager" /> + <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl" /> <Bug pattern="SC_START_IN_CTOR" /> </Match> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Jan 22 14:10:42 2013 @@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.MalformedURLException; @@ -38,12 +36,7 @@ import javax.net.ssl.HttpsURLConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.IFileInputStream; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.MRConfig; @@ -51,9 +44,6 @@ import org.apache.hadoop.mapreduce.MRJob import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.security.ssl.SSLFactory; -import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -70,7 +60,7 @@ class Fetcher<K,V> extends Thread { /* Default read timeout (in milliseconds) */ private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000; - private final Progressable reporter; + private final Reporter reporter; private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} @@ -92,15 +82,10 @@ class Fetcher<K,V> extends Thread { private final int connectionTimeout; private final int readTimeout; - // Decompression of map-outputs - private final CompressionCodec codec; - private final Decompressor decompressor; private final SecretKey jobTokenSecret; private volatile boolean stopped = false; - private JobConf job; - private static boolean sslShuffle; private static SSLFactory sslFactory; @@ -108,7 +93,6 @@ class Fetcher<K,V> extends Thread { ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) { - this.job = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -130,16 +114,6 @@ class Fetcher<K,V> extends Thread { wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString()); - if (job.getCompressMapOutput()) { - Class<? extends CompressionCodec> codecClass = - job.getMapOutputCompressorClass(DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, job); - decompressor = CodecPool.getDecompressor(codec); - } else { - codec = null; - decompressor = null; - } - this.connectionTimeout = job.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, DEFAULT_STALLED_COPY_TIMEOUT); @@ -170,7 +144,7 @@ class Fetcher<K,V> extends Thread { MapHost host = null; try { // If merge is on, block - merger.waitForInMemoryMerge(); + merger.waitForResource(); // Get a host to shuffle from host = scheduler.getHost(); @@ -386,8 +360,8 @@ class Fetcher<K,V> extends Thread { mapOutput = merger.reserve(mapId, decompressedLength, id); // Check if we can shuffle *now* ... - if (mapOutput.getType() == Type.WAIT) { - LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); + if (mapOutput == null) { + LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ..."); //Not an error but wait to process data. return EMPTY_ATTEMPT_ID_ARRAY; } @@ -396,13 +370,9 @@ class Fetcher<K,V> extends Thread { LOG.info("fetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + - mapOutput.getType()); - if (mapOutput.getType() == Type.MEMORY) { - shuffleToMemory(host, mapOutput, input, - (int) decompressedLength, (int) compressedLength); - } else { - shuffleToDisk(host, mapOutput, input, compressedLength); - } + mapOutput.getDescription()); + mapOutput.shuffle(host, input, compressedLength, decompressedLength, + metrics, reporter); // Inform the shuffle scheduler long endTime = System.currentTimeMillis(); @@ -538,84 +508,4 @@ class Fetcher<K,V> extends Thread { } } } - - private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput, - InputStream input, - int decompressedLength, - int compressedLength) throws IOException { - IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength, job); - - input = checksumIn; - - // Are map-outputs compressed? - if (codec != null) { - decompressor.reset(); - input = codec.createInputStream(input, decompressor); - } - - // Copy map-output into an in-memory buffer - byte[] shuffleData = mapOutput.getMemory(); - - try { - IOUtils.readFully(input, shuffleData, 0, shuffleData.length); - metrics.inputBytes(shuffleData.length); - reporter.progress(); - LOG.info("Read " + shuffleData.length + " bytes from map-output for " + - mapOutput.getMapId()); - } catch (IOException ioe) { - // Close the streams - IOUtils.cleanup(LOG, input); - - // Re-throw - throw ioe; - } - - } - - private void shuffleToDisk(MapHost host, MapOutput<K,V> mapOutput, - InputStream input, - long compressedLength) - throws IOException { - // Copy data to local-disk - OutputStream output = mapOutput.getDisk(); - long bytesLeft = compressedLength; - try { - final int BYTES_TO_READ = 64 * 1024; - byte[] buf = new byte[BYTES_TO_READ]; - while (bytesLeft > 0) { - int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); - if (n < 0) { - throw new IOException("read past end of stream reading " + - mapOutput.getMapId()); - } - output.write(buf, 0, n); - bytesLeft -= n; - metrics.inputBytes(n); - reporter.progress(); - } - - LOG.info("Read " + (compressedLength - bytesLeft) + - " bytes from map-output for " + - mapOutput.getMapId()); - - output.close(); - } catch (IOException ioe) { - // Close the streams - IOUtils.cleanup(LOG, input, output); - - // Re-throw - throw ioe; - } - - // Sanity check - if (bytesLeft != 0) { - throw new IOException("Incomplete map output received for " + - mapOutput.getMapId() + " from " + - host.getHostName() + " (" + - bytesLeft + " bytes missing of " + - compressedLength + ")" - ); - } - } } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java?rev=1436936&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java Tue Jan 22 14:10:42 2013 @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.InputStream; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.IOUtils; + +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; + +import org.apache.hadoop.mapred.IFileInputStream; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.hadoop.mapreduce.TaskAttemptID; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class InMemoryMapOutput<K, V> extends MapOutput<K, V> { + private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class); + private Configuration conf; + private final MergeManagerImpl<K, V> merger; + private final byte[] memory; + private BoundedByteArrayOutputStream byteStream; + // Decompression of map-outputs + private final CompressionCodec codec; + private final Decompressor decompressor; + + public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId, + MergeManagerImpl<K, V> merger, + int size, CompressionCodec codec, + boolean primaryMapOutput) { + super(mapId, (long)size, primaryMapOutput); + this.conf = conf; + this.merger = merger; + this.codec = codec; + byteStream = new BoundedByteArrayOutputStream(size); + memory = byteStream.getBuffer(); + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + public byte[] getMemory() { + return memory; + } + + public BoundedByteArrayOutputStream getArrayStream() { + return byteStream; + } + + @Override + public void shuffle(MapHost host, InputStream input, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, + Reporter reporter) throws IOException { + IFileInputStream checksumIn = + new IFileInputStream(input, compressedLength, conf); + + input = checksumIn; + + // Are map-outputs compressed? + if (codec != null) { + decompressor.reset(); + input = codec.createInputStream(input, decompressor); + } + + try { + IOUtils.readFully(input, memory, 0, memory.length); + metrics.inputBytes(memory.length); + reporter.progress(); + LOG.info("Read " + memory.length + " bytes from map-output for " + + getMapId()); + } catch (IOException ioe) { + // Close the streams + IOUtils.cleanup(LOG, input); + + // Re-throw + throw ioe; + } finally { + CodecPool.returnDecompressor(decompressor); + } + } + + @Override + public void commit() throws IOException { + merger.closeInMemoryFile(this); + } + + @Override + public void abort() { + merger.unreserve(memory.length); + } + + @Override + public String getDescription() { + return "MEMORY"; + } +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Tue Jan 22 14:10:42 2013 @@ -35,12 +35,12 @@ import org.apache.hadoop.mapreduce.TaskA @InterfaceStability.Unstable public class InMemoryReader<K, V> extends Reader<K, V> { private final TaskAttemptID taskAttemptId; - private final MergeManager<K,V> merger; + private final MergeManagerImpl<K,V> merger; DataInputBuffer memDataIn = new DataInputBuffer(); private int start; private int length; - public InMemoryReader(MergeManager<K,V> merger, TaskAttemptID taskAttemptId, + public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId, byte[] data, int start, int length) throws IOException { super(null, null, length - start, null, null); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Tue Jan 22 14:10:42 2013 @@ -17,119 +17,36 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import java.io.InputStream; import java.io.IOException; -import java.io.OutputStream; + import java.util.Comparator; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BoundedByteArrayOutputStream; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapOutputFile; + +import org.apache.hadoop.mapred.Reporter; + import org.apache.hadoop.mapreduce.TaskAttemptID; @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable -public class MapOutput<K,V> { - private static final Log LOG = LogFactory.getLog(MapOutput.class); +public abstract class MapOutput<K, V> { private static AtomicInteger ID = new AtomicInteger(0); - public static enum Type { - WAIT, - MEMORY, - DISK - } - private final int id; - - private final MergeManager<K,V> merger; private final TaskAttemptID mapId; - private final long size; - - private final byte[] memory; - private BoundedByteArrayOutputStream byteStream; - - private final FileSystem localFS; - private final Path tmpOutputPath; - private final Path outputPath; - private final OutputStream disk; - - private final Type type; - private final boolean primaryMapOutput; - public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, - JobConf conf, LocalDirAllocator localDirAllocator, - int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile) - throws IOException { + public MapOutput(TaskAttemptID mapId, long size, boolean primaryMapOutput) { this.id = ID.incrementAndGet(); this.mapId = mapId; - this.merger = merger; - - type = Type.DISK; - - memory = null; - byteStream = null; - this.size = size; - - this.localFS = FileSystem.getLocal(conf); - outputPath = - mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size); - tmpOutputPath = outputPath.suffix(String.valueOf(fetcher)); - - disk = localFS.create(tmpOutputPath); - this.primaryMapOutput = primaryMapOutput; } - public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, - boolean primaryMapOutput) { - this.id = ID.incrementAndGet(); - this.mapId = mapId; - this.merger = merger; - - type = Type.MEMORY; - byteStream = new BoundedByteArrayOutputStream(size); - memory = byteStream.getBuffer(); - - this.size = size; - - localFS = null; - disk = null; - outputPath = null; - tmpOutputPath = null; - - this.primaryMapOutput = primaryMapOutput; - } - - public MapOutput(TaskAttemptID mapId) { - this.id = ID.incrementAndGet(); - this.mapId = mapId; - - type = Type.WAIT; - merger = null; - memory = null; - byteStream = null; - - size = -1; - - localFS = null; - disk = null; - outputPath = null; - tmpOutputPath = null; - - this.primaryMapOutput = false; -} - public boolean isPrimaryMapOutput() { return primaryMapOutput; } @@ -147,62 +64,28 @@ public class MapOutput<K,V> { return id; } - public Path getOutputPath() { - return outputPath; - } - - public byte[] getMemory() { - return memory; - } - - public BoundedByteArrayOutputStream getArrayStream() { - return byteStream; - } - - public OutputStream getDisk() { - return disk; - } - public TaskAttemptID getMapId() { return mapId; } - public Type getType() { - return type; - } - public long getSize() { return size; } - public void commit() throws IOException { - if (type == Type.MEMORY) { - merger.closeInMemoryFile(this); - } else if (type == Type.DISK) { - localFS.rename(tmpOutputPath, outputPath); - merger.closeOnDiskFile(outputPath); - } else { - throw new IOException("Cannot commit MapOutput of type WAIT!"); - } - } - - public void abort() { - if (type == Type.MEMORY) { - merger.unreserve(memory.length); - } else if (type == Type.DISK) { - try { - localFS.delete(tmpOutputPath, false); - } catch (IOException ie) { - LOG.info("failure to clean up " + tmpOutputPath, ie); - } - } else { - throw new IllegalArgumentException - ("Cannot commit MapOutput with of type WAIT!"); - } - } + public abstract void shuffle(MapHost host, InputStream input, + long compressedLength, + long decompressedLength, + ShuffleClientMetrics metrics, + Reporter reporter) throws IOException; + + public abstract void commit() throws IOException; + public abstract void abort(); + + public abstract String getDescription(); + public String toString() { - return "MapOutput(" + mapId + ", " + type + ")"; + return "MapOutput(" + mapId + ", " + getDescription() + ")"; } public static class MapOutputComparator<K, V> Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1436936&r1=1436935&r2=1436936&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Tue Jan 22 14:10:42 2013 @@ -15,783 +15,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.mapreduce.task.reduce; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; +package org.apache.hadoop.mapreduce.task.reduce; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.ChecksumFileSystem; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.IFile; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; -import org.apache.hadoop.mapred.Merger; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.Task; -import org.apache.hadoop.mapred.IFile.Reader; -import org.apache.hadoop.mapred.IFile.Writer; -import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.Task.CombineOutputCollector; -import org.apache.hadoop.mapred.Task.CombineValuesIterator; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; -@SuppressWarnings(value={"unchecked"}) -@InterfaceAudience.LimitedPrivate({"MapReduce"}) +/** + * An interface for a reduce side merge that works with the default Shuffle + * implementation. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable -public class MergeManager<K, V> { - - private static final Log LOG = LogFactory.getLog(MergeManager.class); - - /* Maximum percentage of the in-memory limit that a single shuffle can - * consume*/ - private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT - = 0.25f; - - private final TaskAttemptID reduceId; - - private final JobConf jobConf; - private final FileSystem localFS; - private final FileSystem rfs; - private final LocalDirAllocator localDirAllocator; - - protected MapOutputFile mapOutputFile; - - Set<MapOutput<K, V>> inMemoryMergedMapOutputs = - new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>()); - private final IntermediateMemoryToMemoryMerger memToMemMerger; - - Set<MapOutput<K, V>> inMemoryMapOutputs = - new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>()); - private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger; - - Set<Path> onDiskMapOutputs = new TreeSet<Path>(); - private final OnDiskMerger onDiskMerger; - - private final long memoryLimit; - private long usedMemory; - private long commitMemory; - private final long maxSingleShuffleLimit; - - private final int memToMemMergeOutputsThreshold; - private final long mergeThreshold; - - private final int ioSortFactor; - - private final Reporter reporter; - private final ExceptionReporter exceptionReporter; - +public interface MergeManager<K, V> { /** - * Combiner class to run during in-memory merge, if defined. + * To wait until merge has some freed resources available so that it can + * accept shuffled data. This will be called before a network connection is + * established to get the map output. */ - private final Class<? extends Reducer> combinerClass; + public void waitForResource() throws InterruptedException; /** - * Resettable collector used for combine. + * To reserve resources for data to be shuffled. This will be called after + * a network connection is made to shuffle the data. + * @param mapId mapper from which data will be shuffled. + * @param requestedSize size in bytes of data that will be shuffled. + * @param fetcher id of the map output fetcher that will shuffle the data. + * @return a MapOutput object that can be used by shuffle to shuffle data. If + * required resources cannot be reserved immediately, a null can be returned. */ - private final CombineOutputCollector<K,V> combineCollector; - - private final Counters.Counter spilledRecordsCounter; - - private final Counters.Counter reduceCombineInputCounter; - - private final Counters.Counter mergedMapOutputsCounter; - - private final CompressionCodec codec; - - private final Progress mergePhase; - - public MergeManager(TaskAttemptID reduceId, JobConf jobConf, - FileSystem localFS, - LocalDirAllocator localDirAllocator, - Reporter reporter, - CompressionCodec codec, - Class<? extends Reducer> combinerClass, - CombineOutputCollector<K,V> combineCollector, - Counters.Counter spilledRecordsCounter, - Counters.Counter reduceCombineInputCounter, - Counters.Counter mergedMapOutputsCounter, - ExceptionReporter exceptionReporter, - Progress mergePhase, MapOutputFile mapOutputFile) { - this.reduceId = reduceId; - this.jobConf = jobConf; - this.localDirAllocator = localDirAllocator; - this.exceptionReporter = exceptionReporter; - - this.reporter = reporter; - this.codec = codec; - this.combinerClass = combinerClass; - this.combineCollector = combineCollector; - this.reduceCombineInputCounter = reduceCombineInputCounter; - this.spilledRecordsCounter = spilledRecordsCounter; - this.mergedMapOutputsCounter = mergedMapOutputsCounter; - this.mapOutputFile = mapOutputFile; - this.mapOutputFile.setConf(jobConf); - - this.localFS = localFS; - this.rfs = ((LocalFileSystem)localFS).getRaw(); - - final float maxInMemCopyUse = - jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f); - if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { - throw new IllegalArgumentException("Invalid value for " + - MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + - maxInMemCopyUse); - } - - // Allow unit tests to fix Runtime memory - this.memoryLimit = - (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, - Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) - * maxInMemCopyUse); - - this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); + public MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize, + int fetcher) throws IOException; - final float singleShuffleMemoryLimitPercent = - jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, - DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT); - if (singleShuffleMemoryLimitPercent <= 0.0f - || singleShuffleMemoryLimitPercent > 1.0f) { - throw new IllegalArgumentException("Invalid value for " - + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": " - + singleShuffleMemoryLimitPercent); - } - - usedMemory = 0L; - commitMemory = 0L; - this.maxSingleShuffleLimit = - (long)(memoryLimit * singleShuffleMemoryLimitPercent); - this.memToMemMergeOutputsThreshold = - jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); - this.mergeThreshold = (long)(this.memoryLimit * - jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, - 0.90f)); - LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " + - "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " + - "mergeThreshold=" + mergeThreshold + ", " + - "ioSortFactor=" + ioSortFactor + ", " + - "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); - - if (this.maxSingleShuffleLimit >= this.mergeThreshold) { - throw new RuntimeException("Invlaid configuration: " - + "maxSingleShuffleLimit should be less than mergeThreshold" - + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit - + "mergeThreshold: " + this.mergeThreshold); - } - - boolean allowMemToMemMerge = - jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false); - if (allowMemToMemMerge) { - this.memToMemMerger = - new IntermediateMemoryToMemoryMerger(this, - memToMemMergeOutputsThreshold); - this.memToMemMerger.start(); - } else { - this.memToMemMerger = null; - } - - this.inMemoryMerger = createInMemoryMerger(); - this.inMemoryMerger.start(); - - this.onDiskMerger = new OnDiskMerger(this); - this.onDiskMerger.start(); - - this.mergePhase = mergePhase; - } - - protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() { - return new InMemoryMerger(this); - } - - TaskAttemptID getReduceId() { - return reduceId; - } - - @VisibleForTesting - ExceptionReporter getExceptionReporter() { - return exceptionReporter; - } - - public void waitForInMemoryMerge() throws InterruptedException { - inMemoryMerger.waitForMerge(); - } - - private boolean canShuffleToMemory(long requestedSize) { - return (requestedSize < maxSingleShuffleLimit); - } - - final private MapOutput<K,V> stallShuffle = new MapOutput<K,V>(null); - - public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, - long requestedSize, - int fetcher - ) throws IOException { - if (!canShuffleToMemory(requestedSize)) { - LOG.info(mapId + ": Shuffling to disk since " + requestedSize + - " is greater than maxSingleShuffleLimit (" + - maxSingleShuffleLimit + ")"); - return new MapOutput<K,V>(mapId, this, requestedSize, jobConf, - localDirAllocator, fetcher, true, - mapOutputFile); - } - - // Stall shuffle if we are above the memory limit - - // It is possible that all threads could just be stalling and not make - // progress at all. This could happen when: - // - // requested size is causing the used memory to go above limit && - // requested size < singleShuffleLimit && - // current used size < mergeThreshold (merge will not get triggered) - // - // To avoid this from happening, we allow exactly one thread to go past - // the memory limit. We check (usedMemory > memoryLimit) and not - // (usedMemory + requestedSize > memoryLimit). When this thread is done - // fetching, this will automatically trigger a merge thereby unlocking - // all the stalled threads - - if (usedMemory > memoryLimit) { - LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory - + ") is greater than memoryLimit (" + memoryLimit + ")." + - " CommitMemory is (" + commitMemory + ")"); - return stallShuffle; - } - - // Allow the in-memory shuffle to progress - LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" - + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." - + "CommitMemory is (" + commitMemory + ")"); - return unconditionalReserve(mapId, requestedSize, true); - } - /** - * Unconditional Reserve is used by the Memory-to-Memory thread - * @return + * Called at the end of shuffle. + * @return a key value iterator object. */ - private synchronized MapOutput<K, V> unconditionalReserve( - TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) { - usedMemory += requestedSize; - return new MapOutput<K,V>(mapId, this, (int)requestedSize, - primaryMapOutput); - } - - synchronized void unreserve(long size) { - usedMemory -= size; - } - - public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) { - inMemoryMapOutputs.add(mapOutput); - LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() - + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() - + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory); - - commitMemory+= mapOutput.getSize(); - - // Can hang if mergeThreshold is really low. - if (commitMemory >= mergeThreshold) { - LOG.info("Starting inMemoryMerger's merge since commitMemory=" + - commitMemory + " > mergeThreshold=" + mergeThreshold + - ". Current usedMemory=" + usedMemory); - inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); - inMemoryMergedMapOutputs.clear(); - inMemoryMerger.startMerge(inMemoryMapOutputs); - commitMemory = 0L; // Reset commitMemory. - } - - if (memToMemMerger != null) { - if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { - memToMemMerger.startMerge(inMemoryMapOutputs); - } - } - } - - - public synchronized void closeInMemoryMergedFile(MapOutput<K,V> mapOutput) { - inMemoryMergedMapOutputs.add(mapOutput); - LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + - ", inMemoryMergedMapOutputs.size() -> " + - inMemoryMergedMapOutputs.size()); - } - - public synchronized void closeOnDiskFile(Path file) { - onDiskMapOutputs.add(file); - - if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { - onDiskMerger.startMerge(onDiskMapOutputs); - } - } - - public RawKeyValueIterator close() throws Throwable { - // Wait for on-going merges to complete - if (memToMemMerger != null) { - memToMemMerger.close(); - } - inMemoryMerger.close(); - onDiskMerger.close(); - - List<MapOutput<K, V>> memory = - new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs); - memory.addAll(inMemoryMapOutputs); - List<Path> disk = new ArrayList<Path>(onDiskMapOutputs); - return finalMerge(jobConf, rfs, memory, disk); - } - - private class IntermediateMemoryToMemoryMerger - extends MergeThread<MapOutput<K, V>, K, V> { - - public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager, - int mergeFactor) { - super(manager, mergeFactor, exceptionReporter); - setName("InMemoryMerger - Thread to do in-memory merge of in-memory " + - "shuffled map-outputs"); - setDaemon(true); - } - - @Override - public void merge(List<MapOutput<K, V>> inputs) throws IOException { - if (inputs == null || inputs.size() == 0) { - return; - } - - TaskAttemptID dummyMapId = inputs.get(0).getMapId(); - List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); - long mergeOutputSize = - createInMemorySegments(inputs, inMemorySegments, 0); - int noInMemorySegments = inMemorySegments.size(); - - MapOutput<K, V> mergedMapOutputs = - unconditionalReserve(dummyMapId, mergeOutputSize, false); - - Writer<K, V> writer = - new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); - - LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + - " segments of total-size: " + mergeOutputSize); - - RawKeyValueIterator rIter = - Merger.merge(jobConf, rfs, - (Class<K>)jobConf.getMapOutputKeyClass(), - (Class<V>)jobConf.getMapOutputValueClass(), - inMemorySegments, inMemorySegments.size(), - new Path(reduceId.toString()), - (RawComparator<K>)jobConf.getOutputKeyComparator(), - reporter, null, null, null); - Merger.writeFile(rIter, writer, reporter, jobConf); - writer.close(); - - LOG.info(reduceId + - " Memory-to-Memory merge of the " + noInMemorySegments + - " files in-memory complete."); - - // Note the output of the merge - closeInMemoryMergedFile(mergedMapOutputs); - } - } - - private class InMemoryMerger extends MergeThread<MapOutput<K,V>, K,V> { - - public InMemoryMerger(MergeManager<K, V> manager) { - super(manager, Integer.MAX_VALUE, exceptionReporter); - setName - ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs"); - setDaemon(true); - } - - @Override - public void merge(List<MapOutput<K,V>> inputs) throws IOException { - if (inputs == null || inputs.size() == 0) { - return; - } - - //name this output file same as the name of the first file that is - //there in the current list of inmem files (this is guaranteed to - //be absent on the disk currently. So we don't overwrite a prev. - //created spill). Also we need to create the output file now since - //it is not guaranteed that this file will be present after merge - //is called (we delete empty files as soon as we see them - //in the merge method) - - //figure out the mapId - TaskAttemptID mapId = inputs.get(0).getMapId(); - TaskID mapTaskId = mapId.getTaskID(); - - List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); - long mergeOutputSize = - createInMemorySegments(inputs, inMemorySegments,0); - int noInMemorySegments = inMemorySegments.size(); - - Path outputPath = - mapOutputFile.getInputFileForWrite(mapTaskId, - mergeOutputSize).suffix( - Task.MERGED_OUTPUT_PREFIX); - - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); - - RawKeyValueIterator rIter = null; - try { - LOG.info("Initiating in-memory merge with " + noInMemorySegments + - " segments..."); - - rIter = Merger.merge(jobConf, rfs, - (Class<K>)jobConf.getMapOutputKeyClass(), - (Class<V>)jobConf.getMapOutputValueClass(), - inMemorySegments, inMemorySegments.size(), - new Path(reduceId.toString()), - (RawComparator<K>)jobConf.getOutputKeyComparator(), - reporter, spilledRecordsCounter, null, null); - - if (null == combinerClass) { - Merger.writeFile(rIter, writer, reporter, jobConf); - } else { - combineCollector.setWriter(writer); - combineAndSpill(rIter, reduceCombineInputCounter); - } - writer.close(); - - LOG.info(reduceId + - " Merge of the " + noInMemorySegments + - " files in-memory complete." + - " Local file is " + outputPath + " of size " + - localFS.getFileStatus(outputPath).getLen()); - } catch (IOException e) { - //make sure that we delete the ondisk file that we created - //earlier when we invoked cloneFileAttributes - localFS.delete(outputPath, true); - throw e; - } - - // Note the output of the merge - closeOnDiskFile(outputPath); - } - - } - - private class OnDiskMerger extends MergeThread<Path,K,V> { - - public OnDiskMerger(MergeManager<K, V> manager) { - super(manager, Integer.MAX_VALUE, exceptionReporter); - setName("OnDiskMerger - Thread to merge on-disk map-outputs"); - setDaemon(true); - } - - @Override - public void merge(List<Path> inputs) throws IOException { - // sanity check - if (inputs == null || inputs.isEmpty()) { - LOG.info("No ondisk files to merge..."); - return; - } - - long approxOutputSize = 0; - int bytesPerSum = - jobConf.getInt("io.bytes.per.checksum", 512); - - LOG.info("OnDiskMerger: We have " + inputs.size() + - " map outputs on disk. Triggering merge..."); - - // 1. Prepare the list of files to be merged. - for (Path file : inputs) { - approxOutputSize += localFS.getFileStatus(file).getLen(); - } - - // add the checksum length - approxOutputSize += - ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); - - // 2. Start the on-disk merge process - Path outputPath = - localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), - approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); - RawKeyValueIterator iter = null; - Path tmpDir = new Path(reduceId.toString()); - try { - iter = Merger.merge(jobConf, rfs, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, inputs.toArray(new Path[inputs.size()]), - true, ioSortFactor, tmpDir, - (RawComparator<K>) jobConf.getOutputKeyComparator(), - reporter, spilledRecordsCounter, null, - mergedMapOutputsCounter, null); - - Merger.writeFile(iter, writer, reporter, jobConf); - writer.close(); - } catch (IOException e) { - localFS.delete(outputPath, true); - throw e; - } - - closeOnDiskFile(outputPath); - - LOG.info(reduceId + - " Finished merging " + inputs.size() + - " map output files on disk of total-size " + - approxOutputSize + "." + - " Local output file is " + outputPath + " of size " + - localFS.getFileStatus(outputPath).getLen()); - } - } - - private void combineAndSpill( - RawKeyValueIterator kvIter, - Counters.Counter inCounter) throws IOException { - JobConf job = jobConf; - Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); - Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); - Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); - RawComparator<K> comparator = - (RawComparator<K>)job.getOutputKeyComparator(); - try { - CombineValuesIterator values = new CombineValuesIterator( - kvIter, comparator, keyClass, valClass, job, Reporter.NULL, - inCounter); - while (values.more()) { - combiner.reduce(values.getKey(), values, combineCollector, - Reporter.NULL); - values.nextKey(); - } - } finally { - combiner.close(); - } - } - - private long createInMemorySegments(List<MapOutput<K,V>> inMemoryMapOutputs, - List<Segment<K, V>> inMemorySegments, - long leaveBytes - ) throws IOException { - long totalSize = 0L; - // We could use fullSize could come from the RamManager, but files can be - // closed but not yet present in inMemoryMapOutputs - long fullSize = 0L; - for (MapOutput<K,V> mo : inMemoryMapOutputs) { - fullSize += mo.getMemory().length; - } - while(fullSize > leaveBytes) { - MapOutput<K,V> mo = inMemoryMapOutputs.remove(0); - byte[] data = mo.getMemory(); - long size = data.length; - totalSize += size; - fullSize -= size; - Reader<K,V> reader = new InMemoryReader<K,V>(MergeManager.this, - mo.getMapId(), - data, 0, (int)size); - inMemorySegments.add(new Segment<K,V>(reader, true, - (mo.isPrimaryMapOutput() ? - mergedMapOutputsCounter : null))); - } - return totalSize; - } - - class RawKVIteratorReader extends IFile.Reader<K,V> { - - private final RawKeyValueIterator kvIter; - - public RawKVIteratorReader(RawKeyValueIterator kvIter, long size) - throws IOException { - super(null, null, size, null, spilledRecordsCounter); - this.kvIter = kvIter; - } - public boolean nextRawKey(DataInputBuffer key) throws IOException { - if (kvIter.next()) { - final DataInputBuffer kb = kvIter.getKey(); - final int kp = kb.getPosition(); - final int klen = kb.getLength() - kp; - key.reset(kb.getData(), kp, klen); - bytesRead += klen; - return true; - } - return false; - } - public void nextRawValue(DataInputBuffer value) throws IOException { - final DataInputBuffer vb = kvIter.getValue(); - final int vp = vb.getPosition(); - final int vlen = vb.getLength() - vp; - value.reset(vb.getData(), vp, vlen); - bytesRead += vlen; - } - public long getPosition() throws IOException { - return bytesRead; - } - - public void close() throws IOException { - kvIter.close(); - } - } - - private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, - List<MapOutput<K,V>> inMemoryMapOutputs, - List<Path> onDiskMapOutputs - ) throws IOException { - LOG.info("finalMerge called with " + - inMemoryMapOutputs.size() + " in-memory map-outputs and " + - onDiskMapOutputs.size() + " on-disk map-outputs"); - - final float maxRedPer = - job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); - if (maxRedPer > 1.0 || maxRedPer < 0.0) { - throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + - maxRedPer); - } - int maxInMemReduce = (int)Math.min( - Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE); - - - // merge config params - Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); - Class<V> valueClass = (Class<V>)job.getMapOutputValueClass(); - boolean keepInputs = job.getKeepFailedTaskFiles(); - final Path tmpDir = new Path(reduceId.toString()); - final RawComparator<K> comparator = - (RawComparator<K>)job.getOutputKeyComparator(); - - // segments required to vacate memory - List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>(); - long inMemToDiskBytes = 0; - boolean mergePhaseFinished = false; - if (inMemoryMapOutputs.size() > 0) { - TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID(); - inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, - memDiskSegments, - maxInMemReduce); - final int numMemDiskSegments = memDiskSegments.size(); - if (numMemDiskSegments > 0 && - ioSortFactor > onDiskMapOutputs.size()) { - - // If we reach here, it implies that we have less than io.sort.factor - // disk segments and this will be incremented by 1 (result of the - // memory segments merge). Since this total would still be - // <= io.sort.factor, we will not do any more intermediate merges, - // the merge of all these disk segments would be directly fed to the - // reduce method - - mergePhaseFinished = true; - // must spill to disk, but can't retain in-mem for intermediate merge - final Path outputPath = - mapOutputFile.getInputFileForWrite(mapId, - inMemToDiskBytes).suffix( - Task.MERGED_OUTPUT_PREFIX); - final RawKeyValueIterator rIter = Merger.merge(job, fs, - keyClass, valueClass, memDiskSegments, numMemDiskSegments, - tmpDir, comparator, reporter, spilledRecordsCounter, null, - mergePhase); - final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, - keyClass, valueClass, codec, null); - try { - Merger.writeFile(rIter, writer, reporter, job); - // add to list of final disk outputs. - onDiskMapOutputs.add(outputPath); - } catch (IOException e) { - if (null != outputPath) { - try { - fs.delete(outputPath, true); - } catch (IOException ie) { - // NOTHING - } - } - throw e; - } finally { - if (null != writer) { - writer.close(); - } - } - LOG.info("Merged " + numMemDiskSegments + " segments, " + - inMemToDiskBytes + " bytes to disk to satisfy " + - "reduce memory limit"); - inMemToDiskBytes = 0; - memDiskSegments.clear(); - } else if (inMemToDiskBytes != 0) { - LOG.info("Keeping " + numMemDiskSegments + " segments, " + - inMemToDiskBytes + " bytes in memory for " + - "intermediate, on-disk merge"); - } - } - - // segments on disk - List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>(); - long onDiskBytes = inMemToDiskBytes; - Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); - for (Path file : onDisk) { - onDiskBytes += fs.getFileStatus(file).getLen(); - LOG.debug("Disk file: " + file + " Length is " + - fs.getFileStatus(file).getLen()); - diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs, - (file.toString().endsWith( - Task.MERGED_OUTPUT_PREFIX) ? - null : mergedMapOutputsCounter) - )); - } - LOG.info("Merging " + onDisk.length + " files, " + - onDiskBytes + " bytes from disk"); - Collections.sort(diskSegments, new Comparator<Segment<K,V>>() { - public int compare(Segment<K, V> o1, Segment<K, V> o2) { - if (o1.getLength() == o2.getLength()) { - return 0; - } - return o1.getLength() < o2.getLength() ? -1 : 1; - } - }); - - // build final list of segments from merged backed by disk + in-mem - List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>(); - long inMemBytes = createInMemorySegments(inMemoryMapOutputs, - finalSegments, 0); - LOG.info("Merging " + finalSegments.size() + " segments, " + - inMemBytes + " bytes from memory into reduce"); - if (0 != onDiskBytes) { - final int numInMemSegments = memDiskSegments.size(); - diskSegments.addAll(0, memDiskSegments); - memDiskSegments.clear(); - // Pass mergePhase only if there is a going to be intermediate - // merges. See comment where mergePhaseFinished is being set - Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; - RawKeyValueIterator diskMerge = Merger.merge( - job, fs, keyClass, valueClass, diskSegments, - ioSortFactor, numInMemSegments, tmpDir, comparator, - reporter, false, spilledRecordsCounter, null, thisPhase); - diskSegments.clear(); - if (0 == finalSegments.size()) { - return diskMerge; - } - finalSegments.add(new Segment<K,V>( - new RawKVIteratorReader(diskMerge, onDiskBytes), true)); - } - return Merger.merge(job, fs, keyClass, valueClass, - finalSegments, finalSegments.size(), tmpDir, - comparator, reporter, spilledRecordsCounter, null, - null); - - } + public RawKeyValueIterator close() throws Throwable; }