Author: jlowe Date: Tue May 14 17:03:30 2013 New Revision: 1482463 URL: http://svn.apache.org/r1482463 Log: MAPREDUCE-5168. Reducer can OOM during shuffle because on-disk outputstream not released. Contributed by Jason Lowe
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMapOutput.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1482463&r1=1482462&r2=1482463&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue May 14 17:03:30 2013 @@ -27,6 +27,9 @@ Release 0.23.8 - UNRELEASED MAPREDUCE-5211. Reducer intermediate files can collide during merge (jlowe) + MAPREDUCE-5168. Reducer can OOM during shuffle because on-disk output + stream not released (jlowe) + Release 0.23.7 - 2013-04-18 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1482463&r1=1482462&r2=1482463&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Tue May 14 17:03:30 2013 @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -56,7 +55,7 @@ class MapOutput<K,V> { private final FileSystem localFS; private final Path tmpOutputPath; private final Path outputPath; - private final OutputStream disk; + private OutputStream disk; private final Type type; @@ -193,6 +192,7 @@ class MapOutput<K,V> { } else if (type == Type.DISK) { localFS.rename(tmpOutputPath, outputPath); merger.closeOnDiskFile(this); + disk = null; } else { throw new IOException("Cannot commit MapOutput of type WAIT!"); } @@ -207,6 +207,7 @@ class MapOutput<K,V> { } catch (IOException ie) { LOG.info("failure to clean up " + tmpOutputPath, ie); } + disk = null; } else { throw new IllegalArgumentException ("Cannot commit MapOutput with of type WAIT!"); Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMapOutput.java?rev=1482463&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMapOutput.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMapOutput.java Tue May 14 17:03:30 2013 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMapOutput { + + private static final FileContext lfs = getLfs(); + private static final FileContext getLfs() { + try { + return FileContext.getLocalFSFileContext(); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); + } + } + + private static final Path base = + lfs.makeQualified(new Path("target", TestMapOutput.class.getName())); + + @BeforeClass + public static void createBase() throws IOException { + lfs.mkdir(base, null, true); + } + + @AfterClass + public static void removeBase() throws IOException { + lfs.delete(base, true); + } + + @SuppressWarnings("unchecked") + @Test + public void testDiskOutputBufferLeak() throws IOException { + MapOutputFile mof = mock(MapOutputFile.class); + when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn( + new Path(base, "mapoutputfile")); + TaskAttemptID mtid = new TaskAttemptID("0", 1, TaskType.MAP, 1, 1); + MergeManager<Text,Text> merger = mock(MergeManager.class); + MapOutput<Text,Text> odmo = new MapOutput<Text,Text>( + mtid, merger, 0, new JobConf(), null, 0, true, mof); + Assert.assertNotNull(odmo.getDisk()); + odmo.commit(); + Assert.assertNull(odmo.getDisk()); + + odmo = new MapOutput<Text,Text>( + mtid, merger, 0, new JobConf(), null, 0, true, mof); + Assert.assertNotNull(odmo.getDisk()); + odmo.abort(); + Assert.assertNull(odmo.getDisk()); + } +}