Author: rding Date: Fri Apr 16 17:27:17 2010 New Revision: 935003 URL: http://svn.apache.org/viewvc?rev=935003&view=rev Log: PIG-1348: PigStorage making unnecessary byte array copy when storing data
Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=935003&r1=935002&r2=935003&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Apr 16 17:27:17 2010 @@ -183,6 +183,9 @@ OPTIMIZATIONS BUG FIXES +PIG-1348: PigStorage making unnecessary byte array copy when storing data +(rding) + PIG-1374: PushDownForeachFlatten shall not push ForEach below Join if the flattened fields is used in the next statement (daijy) PIG-1372: Restore PigInputFormat.sJob for backward compatibility (pradeepkth) Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java?rev=935003&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java (added) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextOutputFormat.java Fri Apr 16 17:27:17 2010 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.StorageUtil; + +...@suppresswarnings("unchecked") +public class PigTextOutputFormat extends + TextOutputFormat<WritableComparable, Tuple> { + + private final byte fieldDel; + + protected static class PigLineRecordWriter extends + TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> { + private static final String utf8 = "UTF-8"; + private static final byte[] newline; + static { + try { + newline = "\n".getBytes(utf8); + } catch (UnsupportedEncodingException uee) { + throw new IllegalArgumentException("can't find " + utf8 + + " encoding"); + } + } + + private final byte fieldDel; + + public PigLineRecordWriter(DataOutputStream out, byte fieldDel) { + super(out); + this.fieldDel = fieldDel; + } + + public synchronized void write(WritableComparable key, Tuple value) + throws IOException { + int sz = value.size(); + for (int i = 0; i < sz; i++) { + StorageUtil.putField(out, value.get(i)); + if (i != sz - 1) { + out.writeByte(fieldDel); + } + } + out.write(newline); + } + } + + public PigTextOutputFormat(byte delimiter) { + super(); + fieldDel = delimiter; + } + + @Override + public RecordWriter<WritableComparable, Tuple> getRecordWriter( + TaskAttemptContext job) throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + boolean isCompressed = getCompressOutput(job); + CompressionCodec codec = null; + String extension = ""; + if (isCompressed) { + Class<? extends CompressionCodec> codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(job, extension); + FileSystem fs = file.getFileSystem(conf); + if (!isCompressed) { + FSDataOutputStream fileOut = fs.create(file, false); + return new PigLineRecordWriter(fileOut, fieldDel); + } else { + FSDataOutputStream fileOut = fs.create(file, false); + return new PigLineRecordWriter(new DataOutputStream + (codec.createOutputStream(fileOut)), fieldDel); + } + } +} Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java?rev=935003&r1=935002&r2=935003&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java Fri Apr 16 17:27:17 2010 @@ -17,7 +17,6 @@ */ package org.apache.pig.builtin; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -28,10 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; @@ -39,7 +36,6 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadFunc; import org.apache.pig.LoadPushDown; @@ -50,6 +46,8 @@ import org.apache.pig.StoreFuncInterface import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; +import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -63,9 +61,10 @@ import org.apache.pig.impl.util.UDFConte * delimiter is given as a regular expression. See String.split(delimiter) and * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information. */ +...@suppresswarnings("unchecked") public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadPushDown { - protected RecordReader in = null; + protected RecordReader in = null; protected RecordWriter writer = null; protected final Log mLog = LogFactory.getLog(getClass()); private String signature; @@ -73,7 +72,6 @@ LoadPushDown { private byte fieldDel = '\t'; private ArrayList<Object> mProtoTuple = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); - private static final int BUFFER_SIZE = 1024; private String loadLocation; public PigStorage() { @@ -138,33 +136,10 @@ LoadPushDown { } - protected ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE); - - @SuppressWarnings("unchecked") @Override public void putNext(Tuple f) throws IOException { - // I have to convert integer fields to string, and then to bytes. - // If I use a DataOutputStream to convert directly from integer to - // bytes, I don't get a string representation. - int sz = f.size(); - for (int i = 0; i < sz; i++) { - Object field; - try { - field = f.get(i); - } catch (ExecException ee) { - throw ee; - } - - StorageUtil.putField(mOut, field); - - if (i != sz - 1) { - mOut.write(fieldDel); - } - } - Text text = new Text(mOut.toByteArray()); try { - writer.write(null, text); - mOut.reset(); + writer.write(null, f); } catch (InterruptedException e) { throw new IOException(e); } @@ -244,7 +219,7 @@ LoadPushDown { @Override public OutputFormat getOutputFormat() { - return new TextOutputFormat<WritableComparable, Text>(); + return new PigTextOutputFormat(fieldDel); } @Override