Author: szetszwo Date: Fri Apr 26 01:19:00 2013 New Revision: 1476011 URL: http://svn.apache.org/r1476011 Log: Merge r1471229 through r1476009 from trunk.
Added: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1471229-1476009 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr 26 01:19:00 2013 @@ -209,6 +209,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs anyways after YARN-561. (Xuan Gong via vinodkv) + MAPREDUCE-5069. add concrete common implementations of + CombineFileInputFormat (Sangjin Lee via bobby) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method @@ -336,6 +339,23 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5146. application classloader may be used too early to load classes. (Sangjin Lee via tomwhite) + MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent + with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that + cleanup is now called even if there is an error. The old mapred API + already ensures that Mapper.close and Reducer.close are invoked during + error handling. Note that it is an incompatible change, however end-users + can override Mapper.run and Reducer.run to get the old (inconsistent) + behaviour. (acmurthy) + + MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient + synchronization on updates to task Counters. (Sandy Ryza via acmurthy) + + MAPREDUCE-5181. RMCommunicator should not use AMToken from the env. + (Vinod Kumar Vavilapalli via sseth) + + MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after + YARN-577. (Hitesh Shah via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1471229-1476009 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1471229-1476009 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Apr 26 01:19:00 2013 @@ -38,13 +38,9 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -280,24 +276,7 @@ public abstract class RMCommunicator ext throw new YarnException(e); } - if (UserGroupInformation.isSecurityEnabled()) { - String tokenURLEncodedStr = System.getenv().get( - ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); - Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); - - try { - token.decodeFromUrlString(tokenURLEncodedStr); - } catch (IOException e) { - throw new YarnException(e); - } - - SecurityUtil.setTokenService(token, serviceAddr); - if (LOG.isDebugEnabled()) { - LOG.debug("AppMasterToken is " + token); - } - currentUser.addToken(token); - } - + // CurrentUser should already have AMToken loaded. return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() { @Override public AMRMProtocol run() { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Apr 26 01:19:00 2013 @@ -18,6 +18,10 @@ package org.apache.hadoop.mapred; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -86,8 +90,6 @@ public class LocalJobRunner implements C private static final String jobDir = "localRunner/"; - private static final Counters EMPTY_COUNTERS = new Counters(); - public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } @@ -273,10 +275,10 @@ public class LocalJobRunner implements C this.partialMapProgress = new float[numMaps]; this.mapCounters = new Counters[numMaps]; for (int i = 0; i < numMaps; i++) { - this.mapCounters[i] = EMPTY_COUNTERS; + this.mapCounters[i] = new Counters(); } - this.reduceCounters = EMPTY_COUNTERS; + this.reduceCounters = new Counters(); } /** @@ -497,6 +499,15 @@ public class LocalJobRunner implements C public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException { + // Serialize as we would if distributed in order to make deep copy + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + taskStatus.write(dos); + dos.close(); + taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap()); + taskStatus.readFields(new DataInputStream( + new ByteArrayInputStream(baos.toByteArray()))); + LOG.info(taskStatus.getStateString()); int taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -525,10 +536,10 @@ public class LocalJobRunner implements C public synchronized Counters getCurrentCounters() { if (null == mapCounters) { // Counters not yet initialized for job. - return EMPTY_COUNTERS; + return new Counters(); } - Counters current = EMPTY_COUNTERS; + Counters current = new Counters(); for (Counters c : mapCounters) { current = Counters.sum(current, c); } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Apr 26 01:19:00 2013 @@ -434,10 +434,15 @@ public class MapTask extends Task { } statusUpdate(umbilical); collector.flush(); - } finally { - //close - in.close(); // close input + + in.close(); + in = null; + collector.close(); + collector = null; + } finally { + closeQuietly(in); + closeQuietly(collector); } } @@ -753,13 +758,20 @@ public class MapTask extends Task { new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); - input.initialize(split, mapperContext); - mapper.run(mapperContext); - mapPhase.complete(); - setPhase(TaskStatus.Phase.SORT); - statusUpdate(umbilical); - input.close(); - output.close(mapperContext); + try { + input.initialize(split, mapperContext); + mapper.run(mapperContext); + mapPhase.complete(); + setPhase(TaskStatus.Phase.SORT); + statusUpdate(umbilical); + input.close(); + input = null; + output.close(mapperContext); + output = null; + } finally { + closeQuietly(input); + closeQuietly(output, mapperContext); + } } class DirectMapOutputCollector<K, V> @@ -1949,4 +1961,55 @@ public class MapTask extends Task { } } + private <INKEY,INVALUE,OUTKEY,OUTVALUE> + void closeQuietly(RecordReader<INKEY, INVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (IOException ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <OUTKEY, OUTVALUE> + void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <INKEY, INVALUE, OUTKEY, OUTVALUE> + void closeQuietly( + org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <INKEY, INVALUE, OUTKEY, OUTVALUE> + void closeQuietly( + org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c, + org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context + mapperContext) { + if (c != null) { + try { + c.close(mapperContext); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Fri Apr 26 01:19:00 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -428,14 +429,15 @@ public class ReduceTask extends Task { // make output collector String finalName = getOutputName(getPartition()); - final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>( + RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>( this, job, reporter, finalName); - + final RecordWriter<OUTKEY, OUTVALUE> finalOut = out; + OutputCollector<OUTKEY,OUTVALUE> collector = new OutputCollector<OUTKEY,OUTVALUE>() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { - out.write(key, value); + finalOut.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } @@ -466,20 +468,14 @@ public class ReduceTask extends Task { values.informReduceProgress(); } - //Clean up: repeated in catch block below reducer.close(); - out.close(reporter); - //End of clean up. - } catch (IOException ioe) { - try { - reducer.close(); - } catch (IOException ignored) {} - - try { - out.close(reporter); - } catch (IOException ignored) {} + reducer = null; - throw ioe; + out.close(reporter); + out = null; + } finally { + IOUtils.cleanup(LOG, reducer); + closeQuietly(out, reporter); } } @@ -645,7 +641,21 @@ public class ReduceTask extends Task { committer, reporter, comparator, keyClass, valueClass); - reducer.run(reducerContext); - trackedRW.close(reducerContext); + try { + reducer.run(reducerContext); + } finally { + trackedRW.close(reducerContext); + } + } + + private <OUTKEY, OUTVALUE> + void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) { + if (c != null) { + try { + c.close(r); + } catch (Exception e) { + LOG.info("Exception in closing " + c, e); + } + } } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Fri Apr 26 01:19:00 2013 @@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO */ public void run(Context context) throws IOException, InterruptedException { setup(context); - while (context.nextKeyValue()) { - map(context.getCurrentKey(), context.getCurrentValue(), context); + try { + while (context.nextKeyValue()) { + map(context.getCurrentKey(), context.getCurrentValue(), context); + } + } finally { + cleanup(context); } - cleanup(context); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Fri Apr 26 01:19:00 2013 @@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU */ public void run(Context context) throws IOException, InterruptedException { setup(context); - while (context.nextKey()) { - reduce(context.getCurrentKey(), context.getValues(), context); - // If a back up store is used, reset it - Iterator<VALUEIN> iter = context.getValues().iterator(); - if(iter instanceof ReduceContext.ValueIterator) { - ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); + try { + while (context.nextKey()) { + reduce(context.getCurrentKey(), context.getValues(), context); + // If a back up store is used, reset it + Iterator<VALUEIN> iter = context.getValues().iterator(); + if(iter instanceof ReduceContext.ValueIterator) { + ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); + } } + } finally { + cleanup(context); } - cleanup(context); } } Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1471229-1476009 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Fri Apr 26 01:19:00 2013 @@ -89,7 +89,7 @@ public class NotRunningJob implements MR // used for a non running job return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId, "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", - "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A"); + "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f); } NotRunningJob(ApplicationReport applicationReport, JobState jobState) { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1476011&r1=1476010&r2=1476011&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Fri Apr 26 01:19:00 2013 @@ -413,7 +413,7 @@ public class TestClientServiceDelegate { return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", "appname", "host", 124, null, YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, - "N/A"); + "N/A", 0.0f); } private ApplicationReport getRunningApplicationReport(String host, int port) { @@ -423,7 +423,7 @@ public class TestClientServiceDelegate { return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", "appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, - "N/A"); + "N/A", 0.0f); } private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {