Author: rding
Date: Fri Apr 16 17:21:26 2010
New Revision: 935002

URL: http://svn.apache.org/viewvc?rev=935002&view=rev
Log:
PIG-1348: PigStorage making unnecessary byte array copy when storing data

Added:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=935002&r1=935001&r2=935002&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Apr 16 17:21:26 2010
@@ -43,6 +43,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1348: PigStorage making unnecessary byte array copy when storing data
+(rding)
+
 PIG-1372: Restore PigInputFormat.sJob for backward compatibility (pradeepkth)
 
 PIG-1369: POProject does not handle null tuples and non existent fields in

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java?rev=935002&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
 Fri Apr 16 17:21:26 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.StorageUtil;
+
+...@suppresswarnings("unchecked")
+public class PigTextOutputFormat extends
+        TextOutputFormat<WritableComparable, Tuple> {
+   
+    private final byte fieldDel;
+    
+    protected static class PigLineRecordWriter extends
+            TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> {
+        private static final String utf8 = "UTF-8";
+        private static final byte[] newline;
+        static {
+            try {
+                newline = "\n".getBytes(utf8);
+            } catch (UnsupportedEncodingException uee) {
+                throw new IllegalArgumentException("can't find " + utf8
+                        + " encoding");
+            }
+        }
+        
+        private final byte fieldDel;
+        
+        public PigLineRecordWriter(DataOutputStream out, byte fieldDel) {
+            super(out);
+            this.fieldDel = fieldDel;
+        }
+        
+        public synchronized void write(WritableComparable key, Tuple value)
+                throws IOException {
+                int sz = value.size();
+                for (int i = 0; i < sz; i++) {
+                    StorageUtil.putField(out, value.get(i));
+                    if (i != sz - 1) {
+                        out.writeByte(fieldDel);
+                    }
+                }
+                out.write(newline);       
+        }
+    }
+    
+    public PigTextOutputFormat(byte delimiter) {
+        super();
+        fieldDel = delimiter;
+    }
+    
+    @Override
+    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+            TaskAttemptContext job) throws IOException, InterruptedException {
+        Configuration conf = job.getConfiguration();
+        boolean isCompressed = getCompressOutput(job);
+        CompressionCodec codec = null;
+        String extension = "";
+        if (isCompressed) {
+            Class<? extends CompressionCodec> codecClass = 
+                getOutputCompressorClass(job, GzipCodec.class);
+            codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
conf);
+            extension = codec.getDefaultExtension();
+        }
+        Path file = getDefaultWorkFile(job, extension);
+        FileSystem fs = file.getFileSystem(conf);
+        if (!isCompressed) {
+            FSDataOutputStream fileOut = fs.create(file, false);
+            return new PigLineRecordWriter(fileOut, fieldDel);
+        } else {
+            FSDataOutputStream fileOut = fs.create(file, false);
+            return new PigLineRecordWriter(new DataOutputStream
+                                   (codec.createOutputStream(fileOut)), 
fieldDel);
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=935002&r1=935001&r2=935002&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Fri Apr 16 
17:21:26 2010
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,10 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -39,7 +36,6 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadPushDown;
@@ -50,6 +46,8 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -63,9 +61,10 @@ import org.apache.pig.impl.util.UDFConte
  * delimiter is given as a regular expression. See String.split(delimiter) and
  * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for 
more information.
  */
+...@suppresswarnings("unchecked")
 public class PigStorage extends FileInputLoadFunc implements 
StoreFuncInterface, 
 LoadPushDown {
-    protected RecordReader in = null;
+    protected RecordReader in = null;    
     protected RecordWriter writer = null;
     protected final Log mLog = LogFactory.getLog(getClass());
     private String signature;
@@ -73,7 +72,6 @@ LoadPushDown {
     private byte fieldDel = '\t';
     private ArrayList<Object> mProtoTuple = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
-    private static final int BUFFER_SIZE = 1024;
     private String loadLocation;
     
     public PigStorage() {
@@ -138,33 +136,10 @@ LoadPushDown {
       
     }
 
-    protected ByteArrayOutputStream mOut = new 
ByteArrayOutputStream(BUFFER_SIZE);
-
-    @SuppressWarnings("unchecked")
     @Override
     public void putNext(Tuple f) throws IOException {
-        // I have to convert integer fields to string, and then to bytes.
-        // If I use a DataOutputStream to convert directly from integer to
-        // bytes, I don't get a string representation.
-        int sz = f.size();
-        for (int i = 0; i < sz; i++) {
-            Object field;
-            try {
-                field = f.get(i);
-            } catch (ExecException ee) {
-                throw ee;
-            }
-
-            StorageUtil.putField(mOut, field);
-
-            if (i != sz - 1) {
-                mOut.write(fieldDel);
-            }
-        }
-        Text text = new Text(mOut.toByteArray());
         try {
-            writer.write(null, text);
-            mOut.reset();
+            writer.write(null, f);            
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
@@ -244,7 +219,7 @@ LoadPushDown {
 
     @Override
     public OutputFormat getOutputFormat() {
-        return new TextOutputFormat<WritableComparable, Text>();
+        return new PigTextOutputFormat(fieldDel);
     }
 
     @Override


Reply via email to