Author: pradeepkth
Date: Fri Mar 26 22:33:05 2010
New Revision: 928079
URL: http://svn.apache.org/viewvc?rev=928079&view=rev
Log:
PIG-1316: TextLoader should use Bzip2TextInputFormat for bzip files so that
bzip files can be efficiently processed by splitting the files (pradeepkth)
Added:
hadoop/pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/
hadoop/pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/
hadoop/pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
(props changed)
- copied unchanged from r927987,
hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
Removed:
hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
Modified:
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
Modified:
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java?rev=928079&r1=928078&r2=928079&view=diff
==============================================================================
---
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
(original)
+++
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
Fri Mar 26 22:33:05 2010
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.tools.bzip2r.CBZip2InputStream;
@SuppressWarnings("unchecked")
-public class Bzip2TextInputFormat extends FileInputFormat {
+public class Bzip2TextInputFormat extends PigFileInputFormat {
/**
* Treats keys as offset in file and value as line. Since the input file is
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=928079&r1=928078&r2=928079&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/TextLoader.java Fri Mar 26
22:33:05 2010
@@ -25,13 +25,14 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -45,6 +46,7 @@ import org.apache.pig.data.TupleFactory;
public class TextLoader extends LoadFunc implements LoadCaster {
protected RecordReader in = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
+ private String loadLocation;
@Override
public Tuple getNext() throws IOException {
@@ -198,7 +200,11 @@ public class TextLoader extends LoadFunc
@Override
public InputFormat getInputFormat() {
- return new TextInputFormat();
+ if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
+ return new Bzip2TextInputFormat();
+ } else {
+ return new PigTextInputFormat();
+ }
}
@Override
@@ -213,6 +219,7 @@ public class TextLoader extends LoadFunc
@Override
public void setLocation(String location, Job job) throws IOException {
+ loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=928079&r1=928078&r2=928079&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java Fri Mar 26 22:33:05
2010
@@ -32,6 +32,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -235,17 +236,28 @@ public class TestBZip {
* Tests the case where a bzip block ends exactly at the end of the
{...@link InputSplit}
* with the block header ending a few bits into the last byte of current
* InputSplit. This case results in dropped records in Pig 0.6 release
+ * This test also tests that bzip files couple of dirs deep can be read by
+ * specifying the top level dir.
*/
@Test
public void testBlockHeaderEndingAtSplitNotByteAligned() throws
IOException {
+ // the actual input file is at
+ //
test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
+ // In this test we will load
test/org/apache/pig/test/data/bzipdir1.bz2 to also
+ // test that the BZip2TextInputFormat can read subdirs recursively
String inputFileName =
-
"test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2";
+ "test/org/apache/pig/test/data/bzipdir1.bz2";
Long expectedCount = 74999L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
- Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
- testCount(inputFileName, expectedCount, splitSize);
+ try {
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+ testCount(inputFileName, expectedCount, splitSize, "TextLoader()");
+ } finally {
+ Util.deleteFile(cluster, inputFileName);
+ }
}
/**
@@ -262,47 +274,75 @@ public class TestBZip {
Long expectedCount = 82094L;
// the first block in the above file exactly ends at the byte at
// position 136498 and the last byte is a carriage return ('\r')
- int splitSize = 136498;
- Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
- testCount(inputFileName, expectedCount, splitSize);
+ try {
+ int splitSize = 136498;
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+ } finally {
+ Util.deleteFile(cluster, inputFileName);
+ }
}
/**
* Tests the case where a bzip block ends exactly at the end of the input
* split and has more data which results in overcounting (record
duplication)
* in Pig 0.6
+ *
*/
@Test
public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
+
String inputFileName =
"test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
Long expectedCount = 1041046L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
- Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
- testCount(inputFileName, expectedCount, splitSize);
+ try {
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
+ } finally {
+ Util.deleteFile(cluster, inputFileName);
+ }
}
private void testCount(String inputFileName, Long expectedCount,
- int splitSize) throws IOException {
- try {
- String script = "a = load '" + inputFileName + "';" +
- "b = group a all;" +
- "c = foreach b generate COUNT_STAR(a);";
- Properties props = new Properties();
- for (Entry<Object, Object> entry :
cluster.getProperties().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
+ int splitSize, String loadFuncSpec) throws IOException {
+ String outputFile = "/tmp/bz-output";
+ // simple load-store script to verify that the bzip input is getting
+ // split
+ String scriptToTestSplitting = "a = load '" +inputFileName + "' using
" +
+ loadFuncSpec + "; store a into '" + outputFile + "';";
+
+ String script = "a = load '" + inputFileName + "';" +
+ "b = group a all;" +
+ "c = foreach b generate COUNT_STAR(a);";
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : cluster.getProperties().entrySet())
{
+ props.put(entry.getKey(), entry.getValue());
+ }
+ props.setProperty("mapred.max.split.size",
Integer.toString(splitSize));
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
+ PigServer pig = new PigServer(pigContext);
+ FileSystem fs =
FileSystem.get(ConfigurationUtil.toConfiguration(props));
+ fs.delete(new Path(outputFile), true);
+ Util.registerMultiLineQuery(pig, scriptToTestSplitting);
+
+ // verify that > 1 maps were launched due to splitting of the bzip
input
+ FileStatus[] files = fs.listStatus(new Path(outputFile));
+ int numPartFiles = 0;
+ for (FileStatus fileStatus : files) {
+ if(fileStatus.getPath().getName().startsWith("part")) {
+ numPartFiles++;
}
- props.setProperty("mapred.max.split.size",
Integer.toString(splitSize));
- PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
- PigServer pig = new PigServer(pigContext);
- Util.registerMultiLineQuery(pig, script);
- Iterator<Tuple> it = pig.openIterator("c");
- Long result = (Long) it.next().get(0);
- assertEquals(expectedCount, result);
- } finally {
- Util.deleteFile(cluster, inputFileName);
}
+ assertEquals(true, numPartFiles > 0);
+
+ // verify record count to verify we read bzip data correctly
+ Util.registerMultiLineQuery(pig, script);
+ Iterator<Tuple> it = pig.openIterator("c");
+ Long result = (Long) it.next().get(0);
+ assertEquals(expectedCount, result);
+
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=928079&r1=928078&r2=928079&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Mar 26
22:33:05 2010
@@ -1377,15 +1377,23 @@ public class TestBuiltin extends TestCas
}
*/
-
+ /**
+ * test {...@link TextLoader} - this also tests that {...@link TextLoader}
is capable
+ * of reading data a couple of dirs deep when the input specified is the
top
+ * level directory
+ */
@Test
public void testLFText() throws Exception {
String input1 = "This is some text.\nWith a newline in it.\n";
String expected1 = "This is some text.";
String expected2 = "With a newline in it.";
- Util.createInputFile(cluster, "testLFTest-input1.txt", new String[]
{input1});
+ Util.createInputFile(cluster,
+ "/tmp/testLFTextdir1/testLFTextdir2/testLFTest-input1.txt",
+ new String[] {input1});
+ // check that loading the top level dir still reading the file a couple
+ // of subdirs below
LoadFunc text1 = new ReadToEndLoader(new TextLoader(),
ConfigurationUtil.
- toConfiguration(cluster.getProperties()),
"testLFTest-input1.txt", 0);
+ toConfiguration(cluster.getProperties()),
"/tmp/testLFTextdir1", 0);
Tuple f1 = text1.getNext();
Tuple f2 = text1.getNext();
assertTrue(expected1.equals(f1.get(0).toString()) &&
Propchange:
hadoop/pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar 26 22:33:05 2010
@@ -0,0 +1 @@
+/hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2:741727-770826
Propchange:
hadoop/pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream