Author: jlowe Date: Mon Dec 9 23:42:36 2013 New Revision: 1549710 URL: http://svn.apache.org/r1549710 Log: svn merge -c 1549705 FIXES: MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits. Contributed by Jason Lowe
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2 - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2 hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2 - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2 Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1549710&r1=1549709&r2=1549710&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Dec 9 23:42:36 2013 @@ -94,6 +94,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles) + MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits + (jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Mon Dec 9 23:42:36 2013 @@ -36,6 +36,8 @@ import org.apache.hadoop.io.compress.Com import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; +import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -52,7 +54,7 @@ public class LineRecordReader implements private long start; private long pos; private long end; - private LineReader in; + private SplitLineReader in; private FSDataInputStream fileIn; private final Seekable filePosition; int maxLineLength; @@ -111,17 +113,18 @@ public class LineRecordReader implements ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); - in = new LineReader(cIn, job, recordDelimiter); + in = new CompressedSplitLineReader(cIn, job, recordDelimiter); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { - in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, recordDelimiter); filePosition = fileIn; } } else { fileIn.seek(start); - in = new LineReader(fileIn, job, recordDelimiter); + in = new SplitLineReader(fileIn, job, recordDelimiter); filePosition = fileIn; } // If this is not the first split, we always throw away first record @@ -141,7 +144,7 @@ public class LineRecordReader implements public LineRecordReader(InputStream in, long offset, long endOffset, int maxLineLength, byte[] recordDelimiter) { this.maxLineLength = maxLineLength; - this.in = new LineReader(in, recordDelimiter); + this.in = new SplitLineReader(in, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; @@ -159,7 +162,7 @@ public class LineRecordReader implements throws IOException{ this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - this.in = new LineReader(in, job, recordDelimiter); + this.in = new SplitLineReader(in, job, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; @@ -200,7 +203,7 @@ public class LineRecordReader implements // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) - while (getFilePosition() <= end) { + while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); int newSize = in.readLine(value, maxLineLength, Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Mon Dec 9 23:42:36 2013 @@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.Dec import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.LineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -55,7 +54,7 @@ public class LineRecordReader extends Re private long start; private long pos; private long end; - private LineReader in; + private SplitLineReader in; private FSDataInputStream fileIn; private Seekable filePosition; private int maxLineLength; @@ -94,33 +93,19 @@ public class LineRecordReader extends Re ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); - if (null == this.recordDelimiterBytes){ - in = new LineReader(cIn, job); - } else { - in = new LineReader(cIn, job, this.recordDelimiterBytes); - } - + in = new CompressedSplitLineReader(cIn, job, + this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { - if (null == this.recordDelimiterBytes) { - in = new LineReader(codec.createInputStream(fileIn, decompressor), - job); - } else { - in = new LineReader(codec.createInputStream(fileIn, - decompressor), job, this.recordDelimiterBytes); - } + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); - if (null == this.recordDelimiterBytes){ - in = new LineReader(fileIn, job); - } else { - in = new LineReader(fileIn, job, this.recordDelimiterBytes); - } - + in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } // If this is not the first split, we always throw away first record @@ -160,7 +145,7 @@ public class LineRecordReader extends Re int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) - while (getFilePosition() <= end) { + while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); pos += newSize;