Author: acmurthy Date: Mon Aug 1 22:53:08 2011 New Revision: 1152964 URL: http://svn.apache.org/viewvc?rev=1152964&view=rev Log: MAPREDUCE-2187. Reporter sends progress during sort/merge. Contributed by Anupam Seth.
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt hadoop/common/trunk/mapreduce/src/java/mapred-default.xml hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Modified: hadoop/common/trunk/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/CHANGES.txt (original) +++ hadoop/common/trunk/mapreduce/CHANGES.txt Mon Aug 1 22:53:08 2011 @@ -40,6 +40,9 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via + acmurthy) + MAPREDUCE-2365. Add counters to track bytes (read,written) via File(Input,Output)Format. (Siddharth Seth via acmurthy) Modified: hadoop/common/trunk/mapreduce/src/java/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/mapred-default.xml?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/java/mapred-default.xml (original) +++ hadoop/common/trunk/mapreduce/src/java/mapred-default.xml Mon Aug 1 22:53:08 2011 @@ -1042,6 +1042,14 @@ <!-- End of TaskTracker DistributedCache configuration --> <property> + <name>mapreduce.task.combine.progress.records</name> + <value>10000</value> + <description> The number of records to process during combine output collection + before sending a progress notification to the TaskTracker. + </description> +</property> + +<property> <name>mapreduce.task.merge.progress.records</name> <value>10000</value> <description> The number of records to process during merge before Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Mon Aug 1 22:53:08 2011 @@ -946,7 +946,7 @@ class MapTask extends Task { if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); - combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); + combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf); } else { combineCollector = null; } Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Aug 1 22:53:08 2011 @@ -352,7 +352,7 @@ public class ReduceTask extends Task { Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = (null != combinerClass) ? - new CombineOutputCollector(reduceCombineOutputCounter) : null; + new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; Shuffle shuffle = new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Mon Aug 1 22:53:08 2011 @@ -58,6 +58,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; @@ -79,6 +80,7 @@ abstract public class Task implements Wr LogFactory.getLog(Task.class); public static String MERGED_OUTPUT_PREFIX = ".merged"; + public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000; /** * Counters to measure the usage of the different file systems. @@ -1176,16 +1178,26 @@ abstract public class Task implements Wr implements OutputCollector<K, V> { private Writer<K, V> writer; private Counters.Counter outCounter; - public CombineOutputCollector(Counters.Counter outCounter) { + private Progressable progressable; + private long progressBar; + + public CombineOutputCollector(Counters.Counter outCounter, Progressable progressable, Configuration conf) { this.outCounter = outCounter; + this.progressable=progressable; + progressBar = conf.getLong(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS); } + public synchronized void setWriter(Writer<K, V> writer) { this.writer = writer; } + public synchronized void collect(K key, V value) throws IOException { outCounter.increment(1); writer.append(key, value); + if ((outCounter.getValue() % progressBar) == 0) { + progressable.progress(); + } } } Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1152964&r1=1152963&r2=1152964&view=diff ============================================================================== --- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Aug 1 22:53:08 2011 @@ -260,6 +260,8 @@ public interface MRJobConfig { public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled"; + public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records"; + public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";