Author: jlowe Date: Wed May 28 19:53:08 2014 New Revision: 1598115 URL: http://svn.apache.org/r1598115 Log: svn merge -c 1598111 FIXES: MAPREDUCE-5862. Line records longer than 2x split size aren't handled correctly. Contributed by bc Wong
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt - copied unchanged from r1598111, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 - copied unchanged from r1598111, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed May 28 19:53:08 2014 @@ -99,6 +99,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job history files generated by 2.0.3 history server (Rushabh S Shah via jlowe) + MAPREDUCE-5862. Line records longer than 2x split size aren't handled + correctly (bc Wong via jlowe) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Wed May 28 19:53:08 2014 @@ -85,6 +85,15 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> </project> Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Wed May 28 19:53:08 2014 @@ -184,7 +184,7 @@ public class LineRecordReader implements private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -206,8 +206,7 @@ public class LineRecordReader implements while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); if (newSize == 0) { return false; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed May 28 19:53:08 2014 @@ -121,7 +121,7 @@ public class LineRecordReader extends Re private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -146,8 +146,7 @@ public class LineRecordReader extends Re // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { - newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; if (newSize < maxLineLength) { break; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Wed May 28 19:53:08 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -97,4 +100,92 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList<String> readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + + // Gather the records returned by the record reader + ArrayList<String> records = new ArrayList<String>(); + + long offset = 0; + LongWritable key = new LongWritable(); + Text value = new Text(); + while (offset < testFileSize) { + FileSplit split = + new FileSplit(testFilePath, offset, splitSize, (String[]) null); + LineRecordReader reader = new LineRecordReader(conf, split); + + while (reader.next(key, value)) { + records.add(value.toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList<String> records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, true); + } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Wed May 28 19:53:08 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -101,4 +104,93 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList<String> readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // Gather the records returned by the record reader + ArrayList<String> records = new ArrayList<String>(); + + long offset = 0; + while (offset < testFileSize) { + FileSplit split = new FileSplit(testFilePath, offset, splitSize, null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + + while (reader.nextKeyValue()) { + records.add(reader.getCurrentValue().toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList<String> records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, + false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, + true); + } }