Author: pradeepkth
Date: Mon Dec 21 18:05:45 2009
New Revision: 892907

URL: http://svn.apache.org/viewvc?rev=892907&view=rev
Log:
PIG-1110: Handle compressed file formats -- Gz, BZip with the new proposal 
(rding via pradeepkth)

Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=892907&r1=892906&r2=892907&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Mon Dec 21 18:05:45 2009
@@ -22,6 +22,9 @@
 
 INCOMPATIBLE CHANGES
 
+PIG-1110: Handle compressed file formats -- Gz, BZip with the new proposal
+(rding via pradeepkth)
+
 PIG-1088: change merge join and merge join indexer to work with new LoadFunc
 interface (thejas via pradeepkth)
 

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java?rev=892907&r1=892906&r2=892907&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/PigStorage.java
 Mon Dec 21 18:05:45 2009
@@ -31,6 +31,8 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -384,6 +386,13 @@
     public void setStoreLocation(String location, Job job) throws IOException {
         job.getConfiguration().set("mapred.textoutputformat.separator", "");
         FileOutputFormat.setOutputPath(job, new Path(location));
+        if (location.endsWith(".bz2")) {
+            FileOutputFormat.setCompressOutput(job, true);
+            FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
+        }  else if (location.endsWith(".gz")) {
+            FileOutputFormat.setCompressOutput(job, true);
+            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
     }
 
     @Override

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java?rev=892907&r1=892906&r2=892907&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java 
Mon Dec 21 18:05:45 2009
@@ -17,115 +17,142 @@
  */
 package org.apache.pig.test;
 
-import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 
-import junit.framework.TestCase;
-
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.Test;
 
-public class TestBZip extends TestCase {
+public class TestBZip {
     MiniCluster cluster = MiniCluster.buildCluster();
-
-    /**
-     * Tests the end-to-end writing and reading of a BZip file.
-     */
+    
+   /**
+    * Tests the end-to-end writing and reading of a BZip file.
+    */
     @Test
     public void testBzipInPig() throws Exception {
-        PigServer pig = new PigServer(MAPREDUCE);
-        try {
-            pig.deleteFile("junit-out.bz");
-        } catch (Exception e) {
-        }
-        File in = File.createTempFile("junit", ".bz");
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+       
+        File in = File.createTempFile("junit", ".bz2");
         in.deleteOnExit();
-        File out = File.createTempFile("junit", ".bz");
+        
+        File out = File.createTempFile("junit", ".bz2");
         out.deleteOnExit();
         out.delete();
-        CBZip2OutputStream cos = new CBZip2OutputStream(
-                new FileOutputStream(in));
+               
+        CBZip2OutputStream cos = 
+            new CBZip2OutputStream(new FileOutputStream(in));
         for (int i = 1; i < 100; i++) {
-            cos.write((i + "\n").getBytes());
-            cos.write((-i + "\n").getBytes());
+            StringBuffer sb = new StringBuffer();
+            sb.append(i).append("\n").append(-i).append("\n");
+            byte bytes[] = sb.toString().getBytes();
+            cos.write(bytes);
         }
         cos.close();
-        pig.registerQuery("AA=load '"
+                       
+        pig.registerQuery("AA = load '"
                 + Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
                 + "';");
-        pig.registerQuery("A=foreach (group (filter AA by $0 > 0) all) 
generate flatten($1);");
-        pig.store("A", Util.generateURI(out.getAbsolutePath(), pig
-                .getPigContext()));
-        CBZip2InputStream cis = new CBZip2InputStream(
-                new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+        pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) 
generate flatten($1);");
+        pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
+        
+        File dir = new File("testbzip");     
+        deleteFiles(dir);
+        
+        processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
+        
+        LocalSeekableInputStream is = new LocalSeekableInputStream(
+                new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); 
+        
+        CBZip2InputStream cis = new CBZip2InputStream(is);
+        
         // Just a sanity check, to make sure it was a bzip file; we
         // will do the value verification later
         assertEquals(100, cis.read(new byte[100]));
         cis.close();
-        pig.registerQuery("B=load '"
-                + Util.generateURI(out.getAbsolutePath(), pig.getPigContext())
-                + "';");
+        
+        pig.registerQuery("B = load '" + out.getAbsolutePath() + "';");
+        
         Iterator<Tuple> i = pig.openIterator("B");
         HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
         while (i.hasNext()) {
             Integer val = DataType.toInteger(i.next().get(0));
-            map.put(val, val);
-            
+            map.put(val, val);            
         }
+        
         assertEquals(new Integer(99), new Integer(map.keySet().size()));
-        for(int j = 1; j < 100; j++) {
-          assertEquals(new Integer(j), map.get(j));
+        
+        for (int j = 1; j < 100; j++) {
+            assertEquals(new Integer(j), map.get(j));
         }
+        
         in.delete();
         out.delete();
+        
+        deleteFiles(dir);
     }
-
+    
     /**
      * Tests the end-to-end writing and reading of an empty BZip file.
      */
-    @Test
-    public void testEmptyBzipInPig() throws Exception {
-        PigServer pig = new PigServer(MAPREDUCE);
-        try {
-            pig.deleteFile("junit-out.bz");
-        } catch (Exception e) {
-        }
+     @Test
+     public void testEmptyBzipInPig() throws Exception {
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+ 
         File in = File.createTempFile("junit", ".tmp");
         in.deleteOnExit();
-        File out = File.createTempFile("junit", ".bz");
+
+        File out = File.createTempFile("junit", ".bz2");
         out.deleteOnExit();
         out.delete();
+        
         FileOutputStream fos = new FileOutputStream(in);
         fos.write("55\n".getBytes());
         fos.close();
         System.out.println(in.getAbsolutePath());
-        pig.registerQuery("AA=load '"
+        
+        pig.registerQuery("AA = load '"
                 + Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
                 + "';");
-        pig
-                .registerQuery("A=foreach (group (filter AA by $0 < '0') all) 
generate flatten($1);");
-        pig.store("A", Util.generateURI(out.getAbsolutePath(), pig
-                .getPigContext()));
-        CBZip2InputStream cis = new CBZip2InputStream(
-                new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+        pig.registerQuery("A=foreach (group (filter AA by $0 < '0') all) 
generate flatten($1);");
+        pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
+            
+        File dir = new File("testbzip2");     
+        deleteFiles(dir);
+        
+        processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
+        
+        LocalSeekableInputStream is = new LocalSeekableInputStream(
+                new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); 
+        
+        CBZip2InputStream cis = new CBZip2InputStream(is);
+        
+        // Just a sanity check, to make sure it was a bzip file; we
+        // will do the value verification later
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
-        pig.registerQuery("B=load '"
-                + Util.generateURI(out.getAbsolutePath(), pig.getPigContext())
-                + "';");
+        
+        pig.registerQuery("B = load '" + out.getAbsolutePath() + "';");
         pig.openIterator("B");
+        
         in.delete();
         out.delete();
+        
+        deleteFiles(dir);
     }
 
     /**
@@ -144,6 +171,27 @@
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
         tmp.delete();
+    }
+    
+    private void processCopyToLocal(PigServer pig, String src, String dst) 
+            throws IOException {
 
+        ElementDescriptor srcPath = 
pig.getPigContext().getDfs().asElement(src);
+        ElementDescriptor dstPath = 
pig.getPigContext().getLfs().asElement(dst);
+            
+        srcPath.copy(dstPath, false);
+    }
+    
+    private void deleteFiles(File file) {
+        if (!file.exists()) return;
+            
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (File f : files) {
+                deleteFiles(f);
+            }
+        }
+        System.out.println("delete file: " + file.getAbsolutePath() 
+                + " : " + file.delete());
     }
 }


Reply via email to