Author: acmurthy Date: Wed Apr 24 17:39:28 2013 New Revision: 1471557 URL: http://svn.apache.org/r1471557 Log: Merge -c 1471556 from trunk to branch-2 to fix MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that cleanup is now called even if there is an error. The old mapred API already ensures that Mapper.close and Reducer.close are invoked during error handling. Note that it is an incompatible change, however end-users can override Mapper.run and Reducer.run to get the old (inconsistent) behaviour. Contributed by Arun C. Murthy.
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java - copied unchanged from r1471556, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 24 17:39:28 2013 @@ -180,6 +180,14 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5146. application classloader may be used too early to load classes. (Sangjin Lee via tomwhite) + MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent + with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that + cleanup is now called even if there is an error. The old mapred API + already ensures that Mapper.close and Reducer.close are invoked during + error handling. Note that it is an incompatible change, however end-users + can override Mapper.run and Reducer.run to get the old (inconsistent) + behaviour. (acmurthy) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Apr 24 17:39:28 2013 @@ -434,10 +434,15 @@ public class MapTask extends Task { } statusUpdate(umbilical); collector.flush(); - } finally { - //close - in.close(); // close input + + in.close(); + in = null; + collector.close(); + collector = null; + } finally { + closeQuietly(in); + closeQuietly(collector); } } @@ -753,13 +758,20 @@ public class MapTask extends Task { new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); - input.initialize(split, mapperContext); - mapper.run(mapperContext); - mapPhase.complete(); - setPhase(TaskStatus.Phase.SORT); - statusUpdate(umbilical); - input.close(); - output.close(mapperContext); + try { + input.initialize(split, mapperContext); + mapper.run(mapperContext); + mapPhase.complete(); + setPhase(TaskStatus.Phase.SORT); + statusUpdate(umbilical); + input.close(); + input = null; + output.close(mapperContext); + output = null; + } finally { + closeQuietly(input); + closeQuietly(output, mapperContext); + } } class DirectMapOutputCollector<K, V> @@ -1949,4 +1961,55 @@ public class MapTask extends Task { } } + private <INKEY,INVALUE,OUTKEY,OUTVALUE> + void closeQuietly(RecordReader<INKEY, INVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (IOException ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <OUTKEY, OUTVALUE> + void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <INKEY, INVALUE, OUTKEY, OUTVALUE> + void closeQuietly( + org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) { + if (c != null) { + try { + c.close(); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } + + private <INKEY, INVALUE, OUTKEY, OUTVALUE> + void closeQuietly( + org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c, + org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context + mapperContext) { + if (c != null) { + try { + c.close(mapperContext); + } catch (Exception ie) { + // Ignore + LOG.info("Ignoring exception during close for " + c, ie); + } + } + } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 24 17:39:28 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; @@ -428,14 +429,15 @@ public class ReduceTask extends Task { // make output collector String finalName = getOutputName(getPartition()); - final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>( + RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>( this, job, reporter, finalName); - + final RecordWriter<OUTKEY, OUTVALUE> finalOut = out; + OutputCollector<OUTKEY,OUTVALUE> collector = new OutputCollector<OUTKEY,OUTVALUE>() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { - out.write(key, value); + finalOut.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } @@ -466,20 +468,14 @@ public class ReduceTask extends Task { values.informReduceProgress(); } - //Clean up: repeated in catch block below reducer.close(); - out.close(reporter); - //End of clean up. - } catch (IOException ioe) { - try { - reducer.close(); - } catch (IOException ignored) {} - - try { - out.close(reporter); - } catch (IOException ignored) {} + reducer = null; - throw ioe; + out.close(reporter); + out = null; + } finally { + IOUtils.cleanup(LOG, reducer); + closeQuietly(out, reporter); } } @@ -645,7 +641,21 @@ public class ReduceTask extends Task { committer, reporter, comparator, keyClass, valueClass); - reducer.run(reducerContext); - trackedRW.close(reducerContext); + try { + reducer.run(reducerContext); + } finally { + trackedRW.close(reducerContext); + } + } + + private <OUTKEY, OUTVALUE> + void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) { + if (c != null) { + try { + c.close(r); + } catch (Exception e) { + LOG.info("Exception in closing " + c, e); + } + } } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Wed Apr 24 17:39:28 2013 @@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO */ public void run(Context context) throws IOException, InterruptedException { setup(context); - while (context.nextKeyValue()) { - map(context.getCurrentKey(), context.getCurrentValue(), context); + try { + while (context.nextKeyValue()) { + map(context.getCurrentKey(), context.getCurrentValue(), context); + } + } finally { + cleanup(context); } - cleanup(context); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1471557&r1=1471556&r2=1471557&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Wed Apr 24 17:39:28 2013 @@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU */ public void run(Context context) throws IOException, InterruptedException { setup(context); - while (context.nextKey()) { - reduce(context.getCurrentKey(), context.getValues(), context); - // If a back up store is used, reset it - Iterator<VALUEIN> iter = context.getValues().iterator(); - if(iter instanceof ReduceContext.ValueIterator) { - ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); + try { + while (context.nextKey()) { + reduce(context.getCurrentKey(), context.getValues(), context); + // If a back up store is used, reset it + Iterator<VALUEIN> iter = context.getValues().iterator(); + if(iter instanceof ReduceContext.ValueIterator) { + ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); + } } + } finally { + cleanup(context); } - cleanup(context); } }