HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c600927 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c600927 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c600927 Branch: refs/heads/HDFS-1312 Commit: 1c600927a9caa9aa862514ae5066c48a1f7967a3 Parents: 982bee0 Author: Akira Ajisaka <aajis...@apache.org> Authored: Tue Jun 14 10:18:17 2016 +0900 Committer: Anu Engineer <aengin...@apache.org> Committed: Sat Jun 18 00:05:01 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/io/compress/BZip2Codec.java | 7 +- .../hadoop/mapred/TestTextInputFormat.java | 108 ++++++++++--------- 2 files changed, 63 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c600927/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 2c5a7be..49dd9c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -207,7 +207,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec { // time stream might start without a leading BZ. final long FIRST_BZIP2_BLOCK_MARKER_POSITION = CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); - long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); + long adjStart = 0L; + if (start != 0) { + // Other than the first of file, the marker size is 6 bytes. + adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION + - (HEADER_LEN + SUB_HEADER_LEN))); + } ((Seekable)seekableIn).seek(adjStart); SplitCompressionInputStream in = http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c600927/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java index b833b60..5106c38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java @@ -175,69 +175,75 @@ public class TestTextInputFormat { for (int length = MAX_LENGTH / 2; length < MAX_LENGTH; length += random.nextInt(MAX_LENGTH / 4)+1) { - LOG.info("creating; entries = " + length); - - - // create a file with length entries - Writer writer = - new OutputStreamWriter(codec.createOutputStream(localFs.create(file))); - try { - for (int i = 0; i < length; i++) { - writer.write(Integer.toString(i)); - writer.write("\n"); - } - } finally { - writer.close(); + for (int i = 0; i < 3; i++) { + int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1; + verifyPartitions(length, numSplits, file, codec, conf); } + } - // try splitting the file in a variety of sizes - TextInputFormat format = new TextInputFormat(); - format.configure(conf); - LongWritable key = new LongWritable(); - Text value = new Text(); - for (int i = 0; i < 3; i++) { - int numSplits = random.nextInt(MAX_LENGTH/2000)+1; - LOG.info("splitting: requesting = " + numSplits); - InputSplit[] splits = format.getSplits(conf, numSplits); - LOG.info("splitting: got = " + splits.length); + // corner case when we have byte alignment and position of stream are same + verifyPartitions(471507, 218, file, codec, conf); + verifyPartitions(473608, 110, file, codec, conf); + } + private void verifyPartitions(int length, int numSplits, Path file, + CompressionCodec codec, JobConf conf) throws IOException { + LOG.info("creating; entries = " + length); - // check each split - BitSet bits = new BitSet(length); - for (int j = 0; j < splits.length; j++) { - LOG.debug("split["+j+"]= " + splits[j]); - RecordReader<LongWritable, Text> reader = - format.getRecordReader(splits[j], conf, reporter); - try { - int counter = 0; - while (reader.next(key, value)) { - int v = Integer.parseInt(value.toString()); - LOG.debug("read " + v); - if (bits.get(v)) { - LOG.warn("conflict with " + v + + // create a file with length entries + Writer writer = + new OutputStreamWriter(codec.createOutputStream(localFs.create(file))); + try { + for (int i = 0; i < length; i++) { + writer.write(Integer.toString(i)); + writer.write("\n"); + } + } finally { + writer.close(); + } + + // try splitting the file in a variety of sizes + TextInputFormat format = new TextInputFormat(); + format.configure(conf); + LongWritable key = new LongWritable(); + Text value = new Text(); + LOG.info("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(conf, numSplits); + LOG.info("splitting: got = " + splits.length); + + + // check each split + BitSet bits = new BitSet(length); + for (int j = 0; j < splits.length; j++) { + LOG.debug("split["+j+"]= " + splits[j]); + RecordReader<LongWritable, Text> reader = + format.getRecordReader(splits[j], conf, Reporter.NULL); + try { + int counter = 0; + while (reader.next(key, value)) { + int v = Integer.parseInt(value.toString()); + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + " in split " + j + " at position "+reader.getPos()); - } - assertFalse("Key in multiple partitions.", bits.get(v)); - bits.set(v); - counter++; - } - if (counter > 0) { - LOG.info("splits["+j+"]="+splits[j]+" count=" + counter); - } else { - LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter); - } - } finally { - reader.close(); } + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + counter++; } - assertEquals("Some keys in no partition.", length, bits.cardinality()); + if (counter > 0) { + LOG.info("splits["+j+"]="+splits[j]+" count=" + counter); + } else { + LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter); + } + } finally { + reader.close(); } - } - + assertEquals("Some keys in no partition.", length, bits.cardinality()); } private static LineReader makeStream(String str) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org