Author: rding
Date: Fri Apr 16 17:21:26 2010
New Revision: 935002
URL: http://svn.apache.org/viewvc?rev=935002&view=rev
Log:
PIG-1348: PigStorage making unnecessary byte array copy when storing data
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=935002&r1=935001&r2=935002&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Apr 16 17:21:26 2010
@@ -43,6 +43,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1348: PigStorage making unnecessary byte array copy when storing data
+(rding)
+
PIG-1372: Restore PigInputFormat.sJob for backward compatibility (pradeepkth)
PIG-1369: POProject does not handle null tuples and non existent fields in
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java?rev=935002&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java
Fri Apr 16 17:21:26 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.StorageUtil;
+
+...@suppresswarnings("unchecked")
+public class PigTextOutputFormat extends
+ TextOutputFormat<WritableComparable, Tuple> {
+
+ private final byte fieldDel;
+
+ protected static class PigLineRecordWriter extends
+ TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> {
+ private static final String utf8 = "UTF-8";
+ private static final byte[] newline;
+ static {
+ try {
+ newline = "\n".getBytes(utf8);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IllegalArgumentException("can't find " + utf8
+ + " encoding");
+ }
+ }
+
+ private final byte fieldDel;
+
+ public PigLineRecordWriter(DataOutputStream out, byte fieldDel) {
+ super(out);
+ this.fieldDel = fieldDel;
+ }
+
+ public synchronized void write(WritableComparable key, Tuple value)
+ throws IOException {
+ int sz = value.size();
+ for (int i = 0; i < sz; i++) {
+ StorageUtil.putField(out, value.get(i));
+ if (i != sz - 1) {
+ out.writeByte(fieldDel);
+ }
+ }
+ out.write(newline);
+ }
+ }
+
+ public PigTextOutputFormat(byte delimiter) {
+ super();
+ fieldDel = delimiter;
+ }
+
+ @Override
+ public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+ TaskAttemptContext job) throws IOException, InterruptedException {
+ Configuration conf = job.getConfiguration();
+ boolean isCompressed = getCompressOutput(job);
+ CompressionCodec codec = null;
+ String extension = "";
+ if (isCompressed) {
+ Class<? extends CompressionCodec> codecClass =
+ getOutputCompressorClass(job, GzipCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf);
+ extension = codec.getDefaultExtension();
+ }
+ Path file = getDefaultWorkFile(job, extension);
+ FileSystem fs = file.getFileSystem(conf);
+ if (!isCompressed) {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return new PigLineRecordWriter(fileOut, fieldDel);
+ } else {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return new PigLineRecordWriter(new DataOutputStream
+ (codec.createOutputStream(fileOut)),
fieldDel);
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=935002&r1=935001&r2=935002&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Fri Apr 16
17:21:26 2010
@@ -17,7 +17,6 @@
*/
package org.apache.pig.builtin;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,10 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -39,7 +36,6 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
@@ -50,6 +46,8 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -63,9 +61,10 @@ import org.apache.pig.impl.util.UDFConte
* delimiter is given as a regular expression. See String.split(delimiter) and
* http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for
more information.
*/
+...@suppresswarnings("unchecked")
public class PigStorage extends FileInputLoadFunc implements
StoreFuncInterface,
LoadPushDown {
- protected RecordReader in = null;
+ protected RecordReader in = null;
protected RecordWriter writer = null;
protected final Log mLog = LogFactory.getLog(getClass());
private String signature;
@@ -73,7 +72,6 @@ LoadPushDown {
private byte fieldDel = '\t';
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
- private static final int BUFFER_SIZE = 1024;
private String loadLocation;
public PigStorage() {
@@ -138,33 +136,10 @@ LoadPushDown {
}
- protected ByteArrayOutputStream mOut = new
ByteArrayOutputStream(BUFFER_SIZE);
-
- @SuppressWarnings("unchecked")
@Override
public void putNext(Tuple f) throws IOException {
- // I have to convert integer fields to string, and then to bytes.
- // If I use a DataOutputStream to convert directly from integer to
- // bytes, I don't get a string representation.
- int sz = f.size();
- for (int i = 0; i < sz; i++) {
- Object field;
- try {
- field = f.get(i);
- } catch (ExecException ee) {
- throw ee;
- }
-
- StorageUtil.putField(mOut, field);
-
- if (i != sz - 1) {
- mOut.write(fieldDel);
- }
- }
- Text text = new Text(mOut.toByteArray());
try {
- writer.write(null, text);
- mOut.reset();
+ writer.write(null, f);
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -244,7 +219,7 @@ LoadPushDown {
@Override
public OutputFormat getOutputFormat() {
- return new TextOutputFormat<WritableComparable, Text>();
+ return new PigTextOutputFormat(fieldDel);
}
@Override