Author: olga
Date: Tue Jan  6 17:38:43 2009
New Revision: 732193

URL: http://svn.apache.org/viewvc?rev=732193&view=rev
Log:
PIG-570: problems with processing bzip data

Added:
    hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2   
(with props)
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Jan  6 17:38:43 2009
@@ -353,3 +353,5 @@
 
        PIG-572 A PigServer.registerScript() method, which lets a client
        programmatically register a Pig Script.  (shubhamc via gates)
+
+    PIG-570:  problems with handling bzip data (breed via olgan)

Modified: 
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
 (original)
+++ 
hadoop/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
 Tue Jan  6 17:38:43 2009
@@ -192,10 +192,13 @@
     int j2;
     char z;
     
-    private long retPos, oldPos;
+    // The positioning is a bit tricky. we set newPos when we start reading a 
new block
+    // and we set retPos to newPos once we have read a character from that 
block.
+    // see getPos() for more detail
+    private long retPos, newPos = -1;
 
     public CBZip2InputStream(SeekableInputStream zStream, int blockSize) 
throws IOException {
-        retPos = oldPos = zStream.tell();
+        retPos = newPos = zStream.tell();
        ll8 = null;
         tt = null;
         checkComputedCombinedCRC = blockSize == -1;
@@ -213,6 +216,11 @@
         if (streamEnd) {
             return -1;
         } else {
+            if (retPos < newPos) {
+                retPos = newPos;
+            } else {
+                retPos = newPos+1;
+            }
             int retChar = currentChar;
             switch(currentState) {
             case START_BLOCK_STATE:
@@ -240,15 +248,15 @@
         }
     }
 
+    /**
+     * This is supposed to approximate the position in the underlying stream. 
However,
+     * with compression, the underlying stream position is very vague. One 
position may
+     * have multiple positions and visa versa. So we do something very subtle:
+     * The position of the first byte of a compressed block will have the 
position of
+     * the block header at the start of the block. Every byte after the first 
byte will
+     * be one plus the position of the block header.
+     */
     public long getPos() throws IOException{
-        if (innerBsStream == null)
-            return retPos;
-        long newPos = innerBsStream.tell();
-    
-        if (newPos != oldPos){
-            retPos = oldPos;
-            oldPos = newPos;
-        }
         return retPos;
     }
     
@@ -273,7 +281,7 @@
         computedCombinedCRC = 0;
     }
 
-    private final static long mask = 0x1ffffffffffL;
+    private final static long mask = 0xffffffffffffL;
     private final static long eob = 0x314159265359L & mask;
     private final static long eos = 0x177245385090L & mask;
     
@@ -284,6 +292,7 @@
             return;
         }
 
+        newPos = innerBsStream.tell();
         if (!searchForMagic) {
             char magic1, magic2, magic3, magic4;
             char magic5, magic6;
@@ -306,7 +315,11 @@
                 return;
             }
         } else {
-            long magic = bsR(41);
+            long magic = 0;
+            for(int i = 0; i < 6; i++) {
+                magic <<= 8;
+                magic |= bsGetUChar();
+            }
             while(magic != eos && magic != eob) {
                 magic <<= 1;
                 magic &= mask;

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/executionengine/PigSlicer.java
 Tue Jan  6 17:38:43 2009
@@ -85,6 +85,12 @@
             Map<String, Object> stats = fullPath.getStatistics();
             long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
             long size = (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+            // this hook is mainly for testing, but i'm sure someone can find
+            // something fun to do with it
+            String bsString = System.getProperty("pig.overrideBlockSize");
+            if (bsString != null) {
+                bs = Integer.parseInt(bsString);
+            }
             long pos = 0;
             String name = fullPath.toString();
             if (name.endsWith(".gz") || !splittable) {

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=732193&r1=732192&r2=732193&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue 
Jan  6 17:38:43 2009
@@ -110,6 +110,32 @@
 
     }
 
+    /**
+     * This test checks records that align perfectly on
+     * bzip block boundaries and hdfs block boundaries 
+     */
+    @Test
+    public void testBZip2Aligned() throws Throwable {
+        int offsets[] = { 219642, 219643, 219644, 552019, 552020 };
+        for(int i = 1; i < offsets.length; i ++) {
+            System.setProperty("pig.overrideBlockSize", 
Integer.toString(offsets[i]));
+            PigContext pigContext = new PigContext(ExecType.MAPREDUCE, 
cluster.getProperties());
+            PigServer pig = new PigServer(pigContext);
+            pig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/bzipTest.bz2';");
+            //pig.registerQuery("a = foreach (group (load 
'file:test/org/apache/pig/test/data/bzipTest.bz2') all) generate COUNT($1);");
+            Iterator<Tuple> it = pig.openIterator("a");
+            int count = 0;
+            while(it.hasNext()) {
+                Tuple t = it.next();
+                String s = t.get(0).toString();
+                s = s.substring(0, 7);
+                assertEquals("Using blocksize " + offsets[i] + " problem with 
" + t, count, Integer.parseInt(s, 16));
+                count++;
+            }
+            //assertEquals("1000000", it.next().getField(0));
+        }
+    }
+    
     @Test
     public Double bigGroupAll( File tmpFile ) throws Throwable {
 

Added: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2?rev=732193&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/branches/types/test/org/apache/pig/test/data/bzipTest.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream


Reply via email to