Author: pradeepkth Date: Mon Dec 21 18:05:45 2009 New Revision: 892907 URL: http://svn.apache.org/viewvc?rev=892907&view=rev Log: PIG-1110: Handle compressed file formats -- Gz, BZip with the new proposal (rding via pradeepkth)
Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=892907&r1=892906&r2=892907&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original) +++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Mon Dec 21 18:05:45 2009 @@ -22,6 +22,9 @@ INCOMPATIBLE CHANGES +PIG-1110: Handle compressed file formats -- Gz, BZip with the new proposal +(rding via pradeepkth) + PIG-1088: change merge join and merge join indexer to work with new LoadFunc interface (thejas via pradeepkth) Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=892907&r1=892906&r2=892907&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java Mon Dec 21 18:05:45 2009 @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; @@ -384,6 +386,13 @@ public void setStoreLocation(String location, Job job) throws IOException { job.getConfiguration().set("mapred.textoutputformat.separator", ""); FileOutputFormat.setOutputPath(job, new Path(location)); + if (location.endsWith(".bz2")) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); + } else if (location.endsWith(".gz")) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); + } } @Override Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java?rev=892907&r1=892906&r2=892907&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java Mon Dec 21 18:05:45 2009 @@ -17,115 +17,142 @@ */ package org.apache.pig.test; -import static org.apache.pig.ExecType.MAPREDUCE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; -import junit.framework.TestCase; - +import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.backend.datastorage.ElementDescriptor; import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream; -import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.tools.bzip2r.CBZip2InputStream; import org.apache.tools.bzip2r.CBZip2OutputStream; import org.junit.Test; -public class TestBZip extends TestCase { +public class TestBZip { MiniCluster cluster = MiniCluster.buildCluster(); - - /** - * Tests the end-to-end writing and reading of a BZip file. - */ + + /** + * Tests the end-to-end writing and reading of a BZip file. + */ @Test public void testBzipInPig() throws Exception { - PigServer pig = new PigServer(MAPREDUCE); - try { - pig.deleteFile("junit-out.bz"); - } catch (Exception e) { - } - File in = File.createTempFile("junit", ".bz"); + PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + + File in = File.createTempFile("junit", ".bz2"); in.deleteOnExit(); - File out = File.createTempFile("junit", ".bz"); + + File out = File.createTempFile("junit", ".bz2"); out.deleteOnExit(); out.delete(); - CBZip2OutputStream cos = new CBZip2OutputStream( - new FileOutputStream(in)); + + CBZip2OutputStream cos = + new CBZip2OutputStream(new FileOutputStream(in)); for (int i = 1; i < 100; i++) { - cos.write((i + "\n").getBytes()); - cos.write((-i + "\n").getBytes()); + StringBuffer sb = new StringBuffer(); + sb.append(i).append("\n").append(-i).append("\n"); + byte bytes[] = sb.toString().getBytes(); + cos.write(bytes); } cos.close(); - pig.registerQuery("AA=load '" + + pig.registerQuery("AA = load '" + Util.generateURI(in.getAbsolutePath(), pig.getPigContext()) + "';"); - pig.registerQuery("A=foreach (group (filter AA by $0 > 0) all) generate flatten($1);"); - pig.store("A", Util.generateURI(out.getAbsolutePath(), pig - .getPigContext())); - CBZip2InputStream cis = new CBZip2InputStream( - new LocalSeekableInputStream(new File(out, "part-00000.bz"))); + pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);"); + pig.registerQuery("store A into '" + out.getAbsolutePath() + "';"); + + File dir = new File("testbzip"); + deleteFiles(dir); + + processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath()); + + LocalSeekableInputStream is = new LocalSeekableInputStream( + new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); + + CBZip2InputStream cis = new CBZip2InputStream(is); + // Just a sanity check, to make sure it was a bzip file; we // will do the value verification later assertEquals(100, cis.read(new byte[100])); cis.close(); - pig.registerQuery("B=load '" - + Util.generateURI(out.getAbsolutePath(), pig.getPigContext()) - + "';"); + + pig.registerQuery("B = load '" + out.getAbsolutePath() + "';"); + Iterator<Tuple> i = pig.openIterator("B"); HashMap<Integer, Integer> map = new HashMap<Integer, Integer>(); while (i.hasNext()) { Integer val = DataType.toInteger(i.next().get(0)); - map.put(val, val); - + map.put(val, val); } + assertEquals(new Integer(99), new Integer(map.keySet().size())); - for(int j = 1; j < 100; j++) { - assertEquals(new Integer(j), map.get(j)); + + for (int j = 1; j < 100; j++) { + assertEquals(new Integer(j), map.get(j)); } + in.delete(); out.delete(); + + deleteFiles(dir); } - + /** * Tests the end-to-end writing and reading of an empty BZip file. */ - @Test - public void testEmptyBzipInPig() throws Exception { - PigServer pig = new PigServer(MAPREDUCE); - try { - pig.deleteFile("junit-out.bz"); - } catch (Exception e) { - } + @Test + public void testEmptyBzipInPig() throws Exception { + PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster + .getProperties()); + File in = File.createTempFile("junit", ".tmp"); in.deleteOnExit(); - File out = File.createTempFile("junit", ".bz"); + + File out = File.createTempFile("junit", ".bz2"); out.deleteOnExit(); out.delete(); + FileOutputStream fos = new FileOutputStream(in); fos.write("55\n".getBytes()); fos.close(); System.out.println(in.getAbsolutePath()); - pig.registerQuery("AA=load '" + + pig.registerQuery("AA = load '" + Util.generateURI(in.getAbsolutePath(), pig.getPigContext()) + "';"); - pig - .registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);"); - pig.store("A", Util.generateURI(out.getAbsolutePath(), pig - .getPigContext())); - CBZip2InputStream cis = new CBZip2InputStream( - new LocalSeekableInputStream(new File(out, "part-00000.bz"))); + pig.registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);"); + pig.registerQuery("store A into '" + out.getAbsolutePath() + "';"); + + File dir = new File("testbzip2"); + deleteFiles(dir); + + processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath()); + + LocalSeekableInputStream is = new LocalSeekableInputStream( + new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); + + CBZip2InputStream cis = new CBZip2InputStream(is); + + // Just a sanity check, to make sure it was a bzip file; we + // will do the value verification later assertEquals(-1, cis.read(new byte[100])); cis.close(); - pig.registerQuery("B=load '" - + Util.generateURI(out.getAbsolutePath(), pig.getPigContext()) - + "';"); + + pig.registerQuery("B = load '" + out.getAbsolutePath() + "';"); pig.openIterator("B"); + in.delete(); out.delete(); + + deleteFiles(dir); } /** @@ -144,6 +171,27 @@ assertEquals(-1, cis.read(new byte[100])); cis.close(); tmp.delete(); + } + + private void processCopyToLocal(PigServer pig, String src, String dst) + throws IOException { + ElementDescriptor srcPath = pig.getPigContext().getDfs().asElement(src); + ElementDescriptor dstPath = pig.getPigContext().getLfs().asElement(dst); + + srcPath.copy(dstPath, false); + } + + private void deleteFiles(File file) { + if (!file.exists()) return; + + if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File f : files) { + deleteFiles(f); + } + } + System.out.println("delete file: " + file.getAbsolutePath() + + " : " + file.delete()); } }