Author: pradeepkth
Date: Fri Mar 26 23:04:15 2010
New Revision: 928094

URL: http://svn.apache.org/viewvc?rev=928094&view=rev
Log:
PIG-1316: TextLoader should use Bzip2TextInputFormat for bzip files so that 
bzip files can be efficiently processed by splitting the files (pradeepkth)

Added:
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/bzipdir1.bz2/
    
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/
    
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
   (props changed)
      - copied unchanged from r928079, 
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
Removed:
    
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
Modified:
    hadoop/pig/branches/branch-0.7/CHANGES.txt
    
hadoop/pig/branches/branch-0.7/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/TextLoader.java
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBZip.java
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBuiltin.java

Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=928094&r1=928093&r2=928094&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Mar 26 23:04:15 2010
@@ -68,6 +68,9 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1316: TextLoader should use Bzip2TextInputFormat for bzip files so that
+bzip files can be efficiently processed by splitting the files (pradeepkth)
+
 PIG-1317: LOLoad should cache results of LoadMetadata.getSchema() for use in
 subsequent calls to LOLoad.getSchema() or LOLoad.determineSchema()
 (pradeepkth)

Modified: 
hadoop/pig/branches/branch-0.7/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java?rev=928094&r1=928093&r2=928094&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
 Fri Mar 26 23:04:15 2010
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 
 @SuppressWarnings("unchecked")
-public class Bzip2TextInputFormat extends FileInputFormat {
+public class Bzip2TextInputFormat extends PigFileInputFormat {
 
     /**
      * Treats keys as offset in file and value as line. Since the input file is

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/TextLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/TextLoader.java?rev=928094&r1=928093&r2=928094&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/TextLoader.java 
(original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/TextLoader.java 
Fri Mar 26 23:04:15 2010
@@ -25,13 +25,14 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -45,6 +46,7 @@ import org.apache.pig.data.TupleFactory;
 public class TextLoader extends LoadFunc implements LoadCaster {
     protected RecordReader in = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private String loadLocation;
 
     @Override
     public Tuple getNext() throws IOException {
@@ -198,7 +200,11 @@ public class TextLoader extends LoadFunc
 
     @Override
     public InputFormat getInputFormat() {
-        return new TextInputFormat();
+        if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
+            return new Bzip2TextInputFormat();
+        } else {
+            return new PigTextInputFormat();
+        }
     }
 
     @Override
@@ -213,6 +219,7 @@ public class TextLoader extends LoadFunc
 
     @Override
     public void setLocation(String location, Job job) throws IOException {
+        loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
     }
 

Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBZip.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBZip.java?rev=928094&r1=928093&r2=928094&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBZip.java 
(original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBZip.java Fri 
Mar 26 23:04:15 2010
@@ -32,6 +32,7 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -235,17 +236,28 @@ public class TestBZip {
      * Tests the case where a bzip block ends exactly at the end of the 
{...@link InputSplit}
      * with the block header ending a few bits into the last byte of current
      * InputSplit. This case results in dropped records in Pig 0.6 release
+     * This test also tests that bzip files couple of dirs deep can be read by
+     * specifying the top level dir.
      */
     @Test
     public void testBlockHeaderEndingAtSplitNotByteAligned() throws 
IOException {
+        // the actual input file is at
+        // 
test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
+        // In this test we will load 
test/org/apache/pig/test/data/bzipdir1.bz2 to also
+        // test that the BZip2TextInputFormat can read subdirs recursively
         String inputFileName = 
-            
"test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2";
+            "test/org/apache/pig/test/data/bzipdir1.bz2";
         Long expectedCount = 74999L; // number of lines in above file
         // the first block in the above file exactly ends a few bits into the 
         // byte at position 136500 
         int splitSize = 136500;
-        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
-        testCount(inputFileName, expectedCount, splitSize);
+        try {
+            Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+            testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+            testCount(inputFileName, expectedCount, splitSize, "TextLoader()");
+        } finally {
+            Util.deleteFile(cluster, inputFileName);
+        }
     }
     
     /**
@@ -262,47 +274,75 @@ public class TestBZip {
         Long expectedCount = 82094L; 
         // the first block in the above file exactly ends at the byte at 
         // position 136498 and the last byte is a carriage return ('\r') 
-        int splitSize = 136498;
-        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
-        testCount(inputFileName, expectedCount, splitSize);
+        try {
+            int splitSize = 136498;
+            Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+            testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+        } finally {
+            Util.deleteFile(cluster, inputFileName);
+        }
     }
     
     /**
      * Tests the case where a bzip block ends exactly at the end of the input
      * split and has more data which results in overcounting (record 
duplication)
      * in Pig 0.6
+     * 
      */
     @Test
     public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
+       
         String inputFileName = 
             "test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
         Long expectedCount = 1041046L; // number of lines in above file
         // the first block in the above file exactly ends a few bits into the 
         // byte at position 136500 
         int splitSize = 136500;
-        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
-        testCount(inputFileName, expectedCount, splitSize);
+        try {
+            Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+            testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+        } finally {
+            Util.deleteFile(cluster, inputFileName);
+        }
     }
     
     private void testCount(String inputFileName, Long expectedCount, 
-            int splitSize) throws IOException {
-        try {
-            String script = "a = load '" + inputFileName + "';" +
-                       "b = group a all;" +
-                       "c = foreach b generate COUNT_STAR(a);";
-            Properties props = new Properties();
-            for (Entry<Object, Object> entry : 
cluster.getProperties().entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
+            int splitSize, String loadFuncSpec) throws IOException {
+        String outputFile = "/tmp/bz-output";
+        // simple load-store script to verify that the bzip input is getting
+        // split
+        String scriptToTestSplitting = "a = load '" +inputFileName + "' using 
" +
+        loadFuncSpec + "; store a into '" + outputFile + "';";
+        
+        String script = "a = load '" + inputFileName + "';" +
+                       "b = group a all;" +
+                       "c = foreach b generate COUNT_STAR(a);";
+        Properties props = new Properties();
+        for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) 
{
+            props.put(entry.getKey(), entry.getValue());
+        }
+        props.setProperty("mapred.max.split.size", 
Integer.toString(splitSize));
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
+        PigServer pig = new PigServer(pigContext);
+        FileSystem fs = 
FileSystem.get(ConfigurationUtil.toConfiguration(props));
+        fs.delete(new Path(outputFile), true);
+        Util.registerMultiLineQuery(pig, scriptToTestSplitting);
+        
+        // verify that > 1 maps were launched due to splitting of the bzip 
input
+        FileStatus[] files = fs.listStatus(new Path(outputFile));
+        int numPartFiles = 0;
+        for (FileStatus fileStatus : files) {
+            if(fileStatus.getPath().getName().startsWith("part")) {
+                numPartFiles++;
             }
-            props.setProperty("mapred.max.split.size", 
Integer.toString(splitSize));
-            PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
-            PigServer pig = new PigServer(pigContext);
-            Util.registerMultiLineQuery(pig, script);
-            Iterator<Tuple> it = pig.openIterator("c");
-            Long result = (Long) it.next().get(0);
-            assertEquals(expectedCount, result);
-        } finally {
-            Util.deleteFile(cluster, inputFileName);
         }
+        assertEquals(true, numPartFiles > 0);
+        
+        // verify record count to verify we read bzip data correctly
+        Util.registerMultiLineQuery(pig, script);
+        Iterator<Tuple> it = pig.openIterator("c");
+        Long result = (Long) it.next().get(0);
+        assertEquals(expectedCount, result);
+        
     }
 }

Modified: 
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBuiltin.java?rev=928094&r1=928093&r2=928094&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBuiltin.java 
(original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestBuiltin.java 
Fri Mar 26 23:04:15 2010
@@ -1377,15 +1377,23 @@ public class TestBuiltin extends TestCas
     }
     */
 
-    
+    /**
+     * test {...@link TextLoader} - this also tests that {...@link TextLoader} 
is capable
+     * of reading data a couple of dirs deep when the input specified is the 
top
+     * level directory
+     */
     @Test
     public void testLFText() throws Exception {
         String input1 = "This is some text.\nWith a newline in it.\n";
         String expected1 = "This is some text.";
         String expected2 = "With a newline in it.";
-        Util.createInputFile(cluster, "testLFTest-input1.txt", new String[] 
{input1});
+        Util.createInputFile(cluster, 
+                "/tmp/testLFTextdir1/testLFTextdir2/testLFTest-input1.txt", 
+                new String[] {input1});
+        // check that loading the top level dir still reading the file a couple
+        // of subdirs below
         LoadFunc text1 = new ReadToEndLoader(new TextLoader(), 
ConfigurationUtil.
-                toConfiguration(cluster.getProperties()), 
"testLFTest-input1.txt", 0);
+                toConfiguration(cluster.getProperties()), 
"/tmp/testLFTextdir1", 0);
         Tuple f1 = text1.getNext();
         Tuple f2 = text1.getNext();
         assertTrue(expected1.equals(f1.get(0).toString()) &&

Propchange: 
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar 26 23:04:15 2010
@@ -0,0 +1 @@
+/hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2:741727-770826

Propchange: 
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream


Reply via email to