Author: tucu Date: Wed Nov 28 23:27:32 2012 New Revision: 1414995 URL: http://svn.apache.org/viewvc?rev=1414995&view=rev Log: Reverting MAPREDUCE-4809 as the committed patch is incorrect
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt Wed Nov 28 23:27:32 2012 @@ -1,10 +1,5 @@ Hadoop MapReduce Change Log -Branch MR-2454 - - MAPREDUCE-4809. Make internal classes required for MAPREDUCE-2454 to be - java public. (Mariappan Asokan via acmurthy) - Trunk (Unreleased) INCOMPATIBLE CHANGES Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Nov 28 23:27:32 2012 @@ -34,8 +34,6 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -56,6 +54,7 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; @@ -73,9 +72,7 @@ import org.apache.hadoop.util.StringInte import org.apache.hadoop.util.StringUtils; /** A Map task. */ -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public class MapTask extends Task { +class MapTask extends Task { /** * The size of each record in the index file for the map-outputs. */ @@ -342,6 +339,10 @@ public class MapTask extends Task { done(umbilical, reporter); } + public Progress getSortPhase() { + return sortPhase; + } + @SuppressWarnings("unchecked") private <T> T getSplitDetails(Path file, long offset) throws IOException { @@ -371,6 +372,22 @@ public class MapTask extends Task { } @SuppressWarnings("unchecked") + private <KEY, VALUE> MapOutputCollector<KEY, VALUE> + createSortingCollector(JobConf job, TaskReporter reporter) + throws IOException, ClassNotFoundException { + MapOutputCollector<KEY, VALUE> collector + = (MapOutputCollector<KEY, VALUE>) + ReflectionUtils.newInstance( + job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, + MapOutputBuffer.class, MapOutputCollector.class), job); + LOG.info("Map output collector class = " + collector.getClass().getName()); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); + return collector; + } + + @SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, @@ -392,11 +409,14 @@ public class MapTask extends Task { int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); - MapOutputCollector collector = null; + MapOutputCollector<OUTKEY, OUTVALUE> collector = null; if (numReduceTasks > 0) { - collector = new MapOutputBuffer(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); } else { - collector = new DirectMapOutputCollector(umbilical, job, reporter); + collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>(); + MapOutputCollector.Context context = + new MapOutputCollector.Context(this, job, reporter); + collector.init(context); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); @@ -642,7 +662,7 @@ public class MapTask extends Task { TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { - collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); + collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) @@ -738,17 +758,6 @@ public class MapTask extends Task { output.close(mapperContext); } - interface MapOutputCollector<K, V> { - - public void collect(K key, V value, int partition - ) throws IOException, InterruptedException; - public void close() throws IOException, InterruptedException; - - public void flush() throws IOException, InterruptedException, - ClassNotFoundException; - - } - class DirectMapOutputCollector<K, V> implements MapOutputCollector<K, V> { @@ -756,14 +765,18 @@ public class MapTask extends Task { private TaskReporter reporter = null; - private final Counters.Counter mapOutputRecordCounter; - private final Counters.Counter fileOutputByteCounter; - private final List<Statistics> fsStats; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; + private List<Statistics> fsStats; + + public DirectMapOutputCollector() { + } @SuppressWarnings("unchecked") - public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, - JobConf job, TaskReporter reporter) throws IOException { - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + this.reporter = context.getReporter(); + JobConf job = context.getJobConf(); String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); @@ -818,27 +831,25 @@ public class MapTask extends Task { } } - @InterfaceAudience.LimitedPrivate({"MapReduce"}) - @InterfaceStability.Unstable - public static class MapOutputBuffer<K extends Object, V extends Object> + private class MapOutputBuffer<K extends Object, V extends Object> implements MapOutputCollector<K, V>, IndexedSortable { - final int partitions; - final JobConf job; - final TaskReporter reporter; - final Class<K> keyClass; - final Class<V> valClass; - final RawComparator<K> comparator; - final SerializationFactory serializationFactory; - final Serializer<K> keySerializer; - final Serializer<V> valSerializer; - final CombinerRunner<K,V> combinerRunner; - final CombineOutputCollector<K, V> combineCollector; + private int partitions; + private JobConf job; + private TaskReporter reporter; + private Class<K> keyClass; + private Class<V> valClass; + private RawComparator<K> comparator; + private SerializationFactory serializationFactory; + private Serializer<K> keySerializer; + private Serializer<V> valSerializer; + private CombinerRunner<K,V> combinerRunner; + private CombineOutputCollector<K, V> combineCollector; // Compression for map-outputs - final CompressionCodec codec; + private CompressionCodec codec; // k/v accounting - final IntBuffer kvmeta; // metadata overlay on backing store + private IntBuffer kvmeta; // metadata overlay on backing store int kvstart; // marks origin of spill metadata int kvend; // marks end of spill metadata int kvindex; // marks end of fully serialized records @@ -862,15 +873,15 @@ public class MapTask extends Task { private static final int METASIZE = NMETA * 4; // size in bytes // spill accounting - final int maxRec; - final int softLimit; + private int maxRec; + private int softLimit; boolean spillInProgress;; int bufferRemaining; volatile Throwable sortSpillException = null; int numSpills = 0; - final int minSpillsForCombine; - final IndexedSorter sorter; + private int minSpillsForCombine; + private IndexedSorter sorter; final ReentrantLock spillLock = new ReentrantLock(); final Condition spillDone = spillLock.newCondition(); final Condition spillReady = spillLock.newCondition(); @@ -878,12 +889,12 @@ public class MapTask extends Task { volatile boolean spillThreadRunning = false; final SpillThread spillThread = new SpillThread(); - final FileSystem rfs; + private FileSystem rfs; // Counters - final Counters.Counter mapOutputByteCounter; - final Counters.Counter mapOutputRecordCounter; - final Counters.Counter fileOutputByteCounter; + private Counters.Counter mapOutputByteCounter; + private Counters.Counter mapOutputRecordCounter; + private Counters.Counter fileOutputByteCounter; final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>(); @@ -891,12 +902,23 @@ public class MapTask extends Task { private int indexCacheMemoryLimit; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; + private MapTask mapTask; + private MapOutputFile mapOutputFile; + private Progress sortPhase; + private Counters.Counter spilledRecordsCounter; + + public MapOutputBuffer() { + } + @SuppressWarnings("unchecked") - public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, - TaskReporter reporter - ) throws IOException, ClassNotFoundException { - this.job = job; - this.reporter = reporter; + public void init(MapOutputCollector.Context context + ) throws IOException, ClassNotFoundException { + job = context.getJobConf(); + reporter = context.getReporter(); + mapTask = context.getMapTask(); + mapOutputFile = mapTask.getMapOutputFile(); + sortPhase = mapTask.getSortPhase(); + spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); @@ -973,7 +995,7 @@ public class MapTask extends Task { if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); - combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf); + combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } @@ -1124,6 +1146,10 @@ public class MapTask extends Task { } } + private TaskAttemptID getTaskID() { + return mapTask.getTaskID(); + } + /** * Set the point from which meta and serialization data expand. The meta * indices are aligned with the buffer, so metadata never spans the ends of @@ -1496,7 +1522,7 @@ public class MapTask extends Task { if (lspillException instanceof Error) { final String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(lspillException); - reportFatalError(getTaskID(), lspillException, logMsg); + mapTask.reportFatalError(getTaskID(), lspillException, logMsg); } throw new IOException("Spill failed", lspillException); } Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Wed Nov 28 23:27:32 2012 @@ -26,8 +26,6 @@ import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; import java.util.zip.Checksum; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -36,9 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.PureJavaCrc32; -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -class SpillRecord { +public class SpillRecord { /** Backing store */ private final ByteBuffer buf; @@ -147,17 +143,3 @@ class SpillRecord { } } - -class IndexRecord { - long startOffset; - long rawLength; - long partLength; - - public IndexRecord() { } - - public IndexRecord(long startOffset, long rawLength, long partLength) { - this.startOffset = startOffset; - this.rawLength = rawLength; - this.partLength = partLength; - } -} Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Wed Nov 28 23:27:32 2012 @@ -584,9 +584,9 @@ abstract public class Task implements Wr return status; } - @InterfaceAudience.LimitedPrivate({"MapReduce"}) + @InterfaceAudience.Private @InterfaceStability.Unstable - public class TaskReporter + protected class TaskReporter extends org.apache.hadoop.mapreduce.StatusReporter implements Runnable, Reporter { private TaskUmbilicalProtocol umbilical; @@ -1466,9 +1466,9 @@ abstract public class Task implements Wr return reducerContext; } - @InterfaceAudience.LimitedPrivate({"MapReduce"}) + @InterfaceAudience.Private @InterfaceStability.Unstable - public static abstract class CombinerRunner<K,V> { + protected static abstract class CombinerRunner<K,V> { protected final Counters.Counter inputCounter; protected final JobConf job; protected final TaskReporter reporter; @@ -1486,13 +1486,13 @@ abstract public class Task implements Wr * @param iterator the key/value pairs to use as input * @param collector the output collector */ - public abstract void combine(RawKeyValueIterator iterator, + abstract void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector ) throws IOException, InterruptedException, ClassNotFoundException; @SuppressWarnings("unchecked") - public static <K,V> + static <K,V> CombinerRunner<K,V> create(JobConf job, TaskAttemptID taskId, Counters.Counter inputCounter, @@ -1542,7 +1542,7 @@ abstract public class Task implements Wr } @SuppressWarnings("unchecked") - public void combine(RawKeyValueIterator kvIter, + protected void combine(RawKeyValueIterator kvIter, OutputCollector<K,V> combineCollector ) throws IOException { Reducer<K,V,K,V> combiner = @@ -1611,7 +1611,7 @@ abstract public class Task implements Wr @SuppressWarnings("unchecked") @Override - public void combine(RawKeyValueIterator iterator, + void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector ) throws IOException, InterruptedException, ClassNotFoundException { Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Nov 28 23:27:32 2012 @@ -30,6 +30,9 @@ public interface MRJobConfig { public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class"; + public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR + = "mapreduce.job.map.output.collector.class"; + public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"; public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class"; Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Wed Nov 28 23:27:32 2012 @@ -17,15 +17,9 @@ */ package org.apache.hadoop.mapreduce.task.reduce; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - /** * An interface for reporting exceptions to other threads */ - -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public interface ExceptionReporter { +interface ExceptionReporter { void reportException(Throwable t); } Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Wed Nov 28 23:27:32 2012 @@ -20,13 +20,9 @@ package org.apache.hadoop.mapreduce.task import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskAttemptID; -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public class MapHost { +class MapHost { public static enum State { IDLE, // No map outputs available Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Wed Nov 28 23:27:32 2012 @@ -24,8 +24,6 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -35,9 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public abstract class MapOutput<K,V> { +class MapOutput<K,V> { private static final Log LOG = LogFactory.getLog(MapOutput.class); private static AtomicInteger ID = new AtomicInteger(0); @@ -66,7 +62,7 @@ public abstract class MapOutput<K,V> { private final boolean primaryMapOutput; - MapOutput(caskAttemptID mapId, MergeManager<K,V> merger, long size, + MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, JobConf conf, LocalDirAllocator localDirAllocator, int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile) throws IOException { Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Wed Nov 28 23:27:32 2012 @@ -59,7 +59,7 @@ import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @SuppressWarnings(value={"unchecked", "deprecation"}) -@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceAudience.Private @InterfaceStability.Unstable public class MergeManager<K, V> { Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Wed Nov 28 23:27:32 2012 @@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.MRJob import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progress; -@InterfaceAudience.LimitedPrivate({"MapReduce"}) +@InterfaceAudience.Private @InterfaceStability.Unstable @SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle<K, V> implements ExceptionReporter { Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Wed Nov 28 23:27:32 2012 @@ -17,8 +17,6 @@ */ package org.apache.hadoop.mapreduce.task.reduce; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -27,9 +25,7 @@ import org.apache.hadoop.metrics.Metrics import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; -@InterfaceAudience.LimitedPrivate({"MapReduce"}) -@InterfaceStability.Unstable -public class ShuffleClientMetrics implements Updater { +class ShuffleClientMetrics implements Updater { private MetricsRecord shuffleMetrics = null; private int numFailedFetches = 0; Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1414995&r1=1414994&r2=1414995&view=diff ============================================================================== --- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Nov 28 23:27:32 2012 @@ -928,4 +928,12 @@ <value>jhs/_h...@realm.tld</value> </property> +<property> + <name>mapreduce.job.map.output.collector.class</name> + <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value> + <description> + It defines the MapOutputCollector implementation to use. + </description> +</property> + </configuration>