Author: olga Date: Tue Jan 6 17:38:43 2009 New Revision: 732193 URL: http://svn.apache.org/viewvc?rev=732193&view=rev Log: PIG-570: problems with processing bzip data
Added: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2 (with props) Modified: hadoop/pig/branches/types/CHANGES.txt hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Modified: hadoop/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=732193&r1=732192&r2=732193&view=diff ============================================================================== --- hadoop/pig/branches/types/CHANGES.txt (original) +++ hadoop/pig/branches/types/CHANGES.txt Tue Jan 6 17:38:43 2009 @@ -353,3 +353,5 @@ PIG-572 A PigServer.registerScript() method, which lets a client programmatically register a Pig Script. (shubhamc via gates) + + PIG-570: problems with handling bzip data (breed via olgan) Modified: hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=732193&r1=732192&r2=732193&view=diff ============================================================================== --- hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original) +++ hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue Jan 6 17:38:43 2009 @@ -192,10 +192,13 @@ int j2; char z; - private long retPos, oldPos; + // The positioning is a bit tricky. we set newPos when we start reading a new block + // and we set retPos to newPos once we have read a character from that block. + // see getPos() for more detail + private long retPos, newPos = -1; public CBZip2InputStream(SeekableInputStream zStream, int blockSize) throws IOException { - retPos = oldPos = zStream.tell(); + retPos = newPos = zStream.tell(); ll8 = null; tt = null; checkComputedCombinedCRC = blockSize == -1; @@ -213,6 +216,11 @@ if (streamEnd) { return -1; } else { + if (retPos < newPos) { + retPos = newPos; + } else { + retPos = newPos+1; + } int retChar = currentChar; switch(currentState) { case START_BLOCK_STATE: @@ -240,15 +248,15 @@ } } + /** + * This is supposed to approximate the position in the underlying stream. However, + * with compression, the underlying stream position is very vague. One position may + * have multiple positions and visa versa. So we do something very subtle: + * The position of the first byte of a compressed block will have the position of + * the block header at the start of the block. Every byte after the first byte will + * be one plus the position of the block header. + */ public long getPos() throws IOException{ - if (innerBsStream == null) - return retPos; - long newPos = innerBsStream.tell(); - - if (newPos != oldPos){ - retPos = oldPos; - oldPos = newPos; - } return retPos; } @@ -273,7 +281,7 @@ computedCombinedCRC = 0; } - private final static long mask = 0x1ffffffffffL; + private final static long mask = 0xffffffffffffL; private final static long eob = 0x314159265359L & mask; private final static long eos = 0x177245385090L & mask; @@ -284,6 +292,7 @@ return; } + newPos = innerBsStream.tell(); if (!searchForMagic) { char magic1, magic2, magic3, magic4; char magic5, magic6; @@ -306,7 +315,11 @@ return; } } else { - long magic = bsR(41); + long magic = 0; + for(int i = 0; i < 6; i++) { + magic <<= 8; + magic |= bsGetUChar(); + } while(magic != eos && magic != eob) { magic <<= 1; magic &= mask; Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=732193&r1=732192&r2=732193&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java Tue Jan 6 17:38:43 2009 @@ -85,6 +85,12 @@ Map<String, Object> stats = fullPath.getStatistics(); long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY)); long size = (Long) (stats.get(ElementDescriptor.LENGTH_KEY)); + // this hook is mainly for testing, but i'm sure someone can find + // something fun to do with it + String bsString = System.getProperty("pig.overrideBlockSize"); + if (bsString != null) { + bs = Integer.parseInt(bsString); + } long pos = 0; String name = fullPath.toString(); if (name.endsWith(".gz") || !splittable) { Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=732193&r1=732192&r2=732193&view=diff ============================================================================== --- hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original) +++ hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Jan 6 17:38:43 2009 @@ -110,6 +110,32 @@ } + /** + * This test checks records that align perfectly on + * bzip block boundaries and hdfs block boundaries + */ + @Test + public void testBZip2Aligned() throws Throwable { + int offsets[] = { 219642, 219643, 219644, 552019, 552020 }; + for(int i = 1; i < offsets.length; i ++) { + System.setProperty("pig.overrideBlockSize", Integer.toString(offsets[i])); + PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pig = new PigServer(pigContext); + pig.registerQuery("a = load 'file:test/org/apache/pig/test/data/bzipTest.bz2';"); + //pig.registerQuery("a = foreach (group (load 'file:test/org/apache/pig/test/data/bzipTest.bz2') all) generate COUNT($1);"); + Iterator<Tuple> it = pig.openIterator("a"); + int count = 0; + while(it.hasNext()) { + Tuple t = it.next(); + String s = t.get(0).toString(); + s = s.substring(0, 7); + assertEquals("Using blocksize " + offsets[i] + " problem with " + t, count, Integer.parseInt(s, 16)); + count++; + } + //assertEquals("1000000", it.next().getField(0)); + } + } + @Test public Double bigGroupAll( File tmpFile ) throws Throwable { Added: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2 URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2?rev=732193&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2 ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream