Author: pradeepkth
Date: Wed Aug  5 23:18:30 2009
New Revision: 801460

URL: http://svn.apache.org/viewvc?rev=801460&view=rev
Log:
PIG-901: InputSplit (SliceWrapper) created by Pig is big in size due to 
serialized PigContext (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/ExecType.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=801460&r1=801459&r2=801460&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug  5 23:18:30 2009
@@ -43,6 +43,9 @@
 PIG-792: skew join implementation (sriranjan via olgan)
 
 BUG FIXES
+    
+    PIG-901: InputSplit (SliceWrapper) created by Pig is big in size due to
+    serialized PigContext (pradeepkth)
 
     PIG-882: log level not propogated to loggers (daijy)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/ExecType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/ExecType.java?rev=801460&r1=801459&r2=801460&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/ExecType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/ExecType.java Wed Aug  5 23:18:30 2009
@@ -18,10 +18,12 @@
 
 package org.apache.pig;
 
+import java.io.Serializable;
+
 /**
  * The type of query execution
  */
-public enum ExecType {
+public enum ExecType implements Serializable {
     /**
      * Run everything on the local machine
      */

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=801460&r1=801459&r2=801460&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Wed Aug  5 23:18:30 2009
@@ -234,7 +234,7 @@
                                }
                                Slice[] pigs = spec.getSlicer().slice(store, 
spec.getFileName());
                                for (Slice split : pigs) {
-                                   splits.add(new SliceWrapper(split, 
pigContext, i, fs, inpTargets.get(i)));
+                                   splits.add(new SliceWrapper(split, 
pigContext.getExecType(), i, fs, inpTargets.get(i)));
                                }
             } catch (ExecException ee) {
                throw ee;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=801460&r1=801459&r2=801460&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 Wed Aug  5 23:18:30 2009
@@ -60,7 +60,7 @@
 public class SliceWrapper implements InputSplit {
 
     private int index;
-    private PigContext pigContext;
+    private ExecType execType;
     private Slice wrapped;
     private transient FileSystem fs;// transient so it isn't serialized
     private transient JobConf lastConf;
@@ -70,9 +70,9 @@
         // for deserialization
     }
 
-    public SliceWrapper(Slice slice, PigContext context, int index, FileSystem 
fs, ArrayList<OperatorKey> targetOps) {
+    public SliceWrapper(Slice slice, ExecType execType, int index, FileSystem 
fs, ArrayList<OperatorKey> targetOps) {
         this.wrapped = slice;
-        this.pigContext = context;
+        this.execType = execType;
         this.index = index;
         this.fs = fs;
         this.targetOps = targetOps;
@@ -122,7 +122,7 @@
         DataStorage store = new 
HDataStorage(ConfigurationUtil.toProperties(job));
         // if the execution is against Mapred DFS, set
         // working dir to /user/<userid>
-        if(pigContext.getExecType() == ExecType.MAPREDUCE)
+        if(execType == ExecType.MAPREDUCE)
             store.setActiveContainer(store.asContainer("/user/" + 
job.getUser()));
         
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
         wrapped.init(store);
@@ -166,7 +166,7 @@
     }
 
     public void readFields(DataInput is) throws IOException {
-        pigContext = (PigContext) readObject(is);
+        execType = (ExecType) readObject(is);
         targetOps = (ArrayList<OperatorKey>) readObject(is);
         index = is.readInt();
         wrapped = (Slice) readObject(is);
@@ -193,7 +193,7 @@
     }
 
     public void write(DataOutput os) throws IOException {
-        writeObject(pigContext, os);
+        writeObject(execType, os);
         writeObject(targetOps, os);
         os.writeInt(index);
         writeObject(wrapped, os);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java?rev=801460&r1=801459&r2=801460&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java Wed Aug  5 
23:18:30 2009
@@ -103,6 +103,7 @@
     @Test
     // See PIG-832
     public void testImportList() throws Exception {
+        
         String FILE_SEPARATOR = System.getProperty("file.separator");
         File tmpDir = File.createTempFile("test", "");
         tmpDir.delete();
@@ -112,37 +113,51 @@
         Util.deleteDirectory(tempDir);
         File udf1Dir = new 
File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf1");
         udf1Dir.mkdirs();
-        File udf1JavaSrc = new 
File(udf1Dir.getAbsolutePath()+FILE_SEPARATOR+"TestUDF.java");
+        File udf2Dir = new 
File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf2");
+        udf2Dir.mkdirs();
+        File udf1JavaSrc = new 
File(udf1Dir.getAbsolutePath()+FILE_SEPARATOR+"TestUDF1.java");
+        File udf2JavaSrc = new 
File(udf2Dir.getAbsolutePath()+FILE_SEPARATOR+"TestUDF2.java");
+        
         String udf1Src = new String("package com.xxx.udf1;\n"+
                 "import java.io.IOException;\n"+
                 "import org.apache.pig.EvalFunc;\n"+
                 "import org.apache.pig.data.Tuple;\n"+
-                "public class TestUDF extends EvalFunc<Integer>{\n"+
+                "public class TestUDF1 extends EvalFunc<Integer>{\n"+
                 "public Integer exec(Tuple input) throws IOException {\n"+
                 "return 1;}\n"+
                 "}");
         
+        String udf2Src = new String("package com.xxx.udf2;\n"+
+                "import org.apache.pig.builtin.PigStorage;\n" +
+                "public class TestUDF2 extends PigStorage { }\n");
+
         // generate java file
-        FileOutputStream outStream = 
+        FileOutputStream outStream1 = 
             new FileOutputStream(udf1JavaSrc);
-        
-        OutputStreamWriter outWriter = new OutputStreamWriter(outStream);
-        outWriter.write(udf1Src);
-        outWriter.close();
+        OutputStreamWriter outWriter1 = new OutputStreamWriter(outStream1);
+        outWriter1.write(udf1Src);
+        outWriter1.close();
+        
+        FileOutputStream outStream2 = 
+            new FileOutputStream(udf2JavaSrc);
+        OutputStreamWriter outWriter2 = new OutputStreamWriter(outStream2);
+        outWriter2.write(udf2Src);
+        outWriter2.close();
         
         // compile
         int status;
         status = Util.executeShellCommand("javac -cp 
"+System.getProperty("java.class.path") + " " + udf1JavaSrc);
-        
+        status = Util.executeShellCommand("javac -cp 
"+System.getProperty("java.class.path") + " " + udf2JavaSrc);
+                
         // generate jar file
-        String jarName = "TestUDFJar1.jar";
+        String jarName = "TestUDFJar.jar";
         status = Util.executeShellCommand("jar -cf " + 
tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName + 
                               " -C " + tmpDir.getAbsolutePath() + " " + "com");
         assertTrue(status==0);
-
+        
         PigServer pig = new PigServer(pigContext);
         pig.registerJar(tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName);
-        
+
         PigContext.initializeImportList("com.xxx.udf1:com.xxx.udf2.");
         ArrayList<String> importList = PigContext.getPackageImportList();
         assertTrue(importList.size()==5);
@@ -152,9 +167,9 @@
         assertTrue(importList.get(3).equals("org.apache.pig.builtin."));
         assertTrue(importList.get(4).equals("org.apache.pig.impl.builtin."));
         
-        Object udf = PigContext.instantiateFuncFromSpec("TestUDF");
-        assertTrue(udf.getClass().toString().endsWith("com.xxx.udf1.TestUDF"));
-        
+        Object udf = PigContext.instantiateFuncFromSpec("TestUDF1");
+        
assertTrue(udf.getClass().toString().endsWith("com.xxx.udf1.TestUDF1"));
+
         int LOOP_COUNT = 40;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -168,11 +183,10 @@
         
         FileLocalizer.deleteTempFiles();
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
-        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "' AS (num:chararray);");
-        pigServer.registerQuery("B = foreach A generate TestUDF(num);");
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "' using TestUDF2() AS 
(num:chararray);");
+        pigServer.registerQuery("B = foreach A generate TestUDF1(num);");
         Iterator<Tuple> iter = pigServer.openIterator("B");
         if(!iter.hasNext()) fail("No output found");
-        int numIdentity = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
             assertTrue(t.get(0) instanceof Integer);


Reply via email to