Author: olga
Date: Tue Jan 6 17:38:43 2009
New Revision: 732193
URL: http://svn.apache.org/viewvc?rev=732193&view=rev
Log:
PIG-570: problems with processing bzip data
Added:
hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2
(with props)
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Jan 6 17:38:43 2009
@@ -353,3 +353,5 @@
PIG-572 A PigServer.registerScript() method, which lets a client
programmatically register a Pig Script. (shubhamc via gates)
+
+ PIG-570: problems with handling bzip data (breed via olgan)
Modified:
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
---
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
(original)
+++
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
Tue Jan 6 17:38:43 2009
@@ -192,10 +192,13 @@
int j2;
char z;
- private long retPos, oldPos;
+ // The positioning is a bit tricky. we set newPos when we start reading a
new block
+ // and we set retPos to newPos once we have read a character from that
block.
+ // see getPos() for more detail
+ private long retPos, newPos = -1;
public CBZip2InputStream(SeekableInputStream zStream, int blockSize)
throws IOException {
- retPos = oldPos = zStream.tell();
+ retPos = newPos = zStream.tell();
ll8 = null;
tt = null;
checkComputedCombinedCRC = blockSize == -1;
@@ -213,6 +216,11 @@
if (streamEnd) {
return -1;
} else {
+ if (retPos < newPos) {
+ retPos = newPos;
+ } else {
+ retPos = newPos+1;
+ }
int retChar = currentChar;
switch(currentState) {
case START_BLOCK_STATE:
@@ -240,15 +248,15 @@
}
}
+ /**
+ * This is supposed to approximate the position in the underlying stream.
However,
+ * with compression, the underlying stream position is very vague. One
position may
+ * have multiple positions and visa versa. So we do something very subtle:
+ * The position of the first byte of a compressed block will have the
position of
+ * the block header at the start of the block. Every byte after the first
byte will
+ * be one plus the position of the block header.
+ */
public long getPos() throws IOException{
- if (innerBsStream == null)
- return retPos;
- long newPos = innerBsStream.tell();
-
- if (newPos != oldPos){
- retPos = oldPos;
- oldPos = newPos;
- }
return retPos;
}
@@ -273,7 +281,7 @@
computedCombinedCRC = 0;
}
- private final static long mask = 0x1ffffffffffL;
+ private final static long mask = 0xffffffffffffL;
private final static long eob = 0x314159265359L & mask;
private final static long eos = 0x177245385090L & mask;
@@ -284,6 +292,7 @@
return;
}
+ newPos = innerBsStream.tell();
if (!searchForMagic) {
char magic1, magic2, magic3, magic4;
char magic5, magic6;
@@ -306,7 +315,11 @@
return;
}
} else {
- long magic = bsR(41);
+ long magic = 0;
+ for(int i = 0; i < 6; i++) {
+ magic <<= 8;
+ magic |= bsGetUChar();
+ }
while(magic != eos && magic != eob) {
magic <<= 1;
magic &= mask;
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
Tue Jan 6 17:38:43 2009
@@ -85,6 +85,12 @@
Map<String, Object> stats = fullPath.getStatistics();
long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
long size = (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+ // this hook is mainly for testing, but i'm sure someone can find
+ // something fun to do with it
+ String bsString = System.getProperty("pig.overrideBlockSize");
+ if (bsString != null) {
+ bs = Integer.parseInt(bsString);
+ }
long pos = 0;
String name = fullPath.toString();
if (name.endsWith(".gz") || !splittable) {
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue
Jan 6 17:38:43 2009
@@ -110,6 +110,32 @@
}
+ /**
+ * This test checks records that align perfectly on
+ * bzip block boundaries and hdfs block boundaries
+ */
+ @Test
+ public void testBZip2Aligned() throws Throwable {
+ int offsets[] = { 219642, 219643, 219644, 552019, 552020 };
+ for(int i = 1; i < offsets.length; i ++) {
+ System.setProperty("pig.overrideBlockSize",
Integer.toString(offsets[i]));
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE,
cluster.getProperties());
+ PigServer pig = new PigServer(pigContext);
+ pig.registerQuery("a = load
'file:test/org/apache/pig/test/data/bzipTest.bz2';");
+ //pig.registerQuery("a = foreach (group (load
'file:test/org/apache/pig/test/data/bzipTest.bz2') all) generate COUNT($1);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ String s = t.get(0).toString();
+ s = s.substring(0, 7);
+ assertEquals("Using blocksize " + offsets[i] + " problem with
" + t, count, Integer.parseInt(s, 16));
+ count++;
+ }
+ //assertEquals("1000000", it.next().getField(0));
+ }
+ }
+
@Test
public Double bigGroupAll( File tmpFile ) throws Throwable {
Added: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2?rev=732193&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream