Author: gates Date: Fri Jan 9 08:56:25 2009 New Revision: 733083 URL: http://svn.apache.org/viewvc?rev=733083&view=rev Log: PIG-599 Added buffering to BufferedPositionedInputStream.
Modified: hadoop/pig/branches/types/CHANGES.txt hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Modified: hadoop/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/CHANGES.txt (original) +++ hadoop/pig/branches/types/CHANGES.txt Fri Jan 9 08:56:25 2009 @@ -357,3 +357,5 @@ programmatically register a Pig Script. (shubhamc via gates) PIG-570: problems with handling bzip data (breed via olgan) + + PIG-599: Added buffering to BufferedPositionedInputStream (gates) Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Fri Jan 9 08:56:25 2009 @@ -110,16 +110,17 @@ if (mBuf == null) mBuf = new ByteArrayOutputStream(4096); mBuf.reset(); while (true) { - // Hadoop's FSDataInputStream (which my input stream is based - // on at some point) is buffered, so I don't need to buffer. + // BufferedPositionedInputStream is buffered, so I don't need + // to buffer. int b = in.read(); if (b == fieldDel) { readField(); } else if (b == recordDel) { readField(); - Tuple t = mTupleFactory.newTuple(mProtoTuple); - mProtoTuple.clear(); + //Tuple t = mTupleFactory.newTuple(mProtoTuple); + Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); + mProtoTuple = null; return t; } else if (b == -1) { // hit end of file Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Fri Jan 9 08:56:25 2009 @@ -76,6 +76,19 @@ } /** + * Construct a tuple from an existing list of objects. Package + * level so that callers cannot directly invoke it. + * @param c List of objects to turn into a tuple. This list will be kept + * as part of the tuple. + * @param junk Just used to differentiate from the constructor above that + * copies the list. + */ + DefaultTuple(List<Object> c, int junk) { + mFields = c; + } + + + /** * Make this tuple reference the contents of another. This method does not copy * the underlying data. It maintains references to the data from the original * tuple (and possibly even to the data structure holding the data). Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java Fri Jan 9 08:56:25 2009 @@ -39,6 +39,10 @@ return new DefaultTuple(c); } + public Tuple newTupleNoCopy(List list) { + return new DefaultTuple(list, 1); + } + public Tuple newTuple(Object datum) { Tuple t = new DefaultTuple(1); try { Modified: hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java Fri Jan 9 08:56:25 2009 @@ -90,12 +90,20 @@ public abstract Tuple newTuple(int size); /** - * Create a tuple from the provided list of objects. + * Create a tuple from the provided list of objects. The underlying list + * will be copied. * @param c List of objects to use as the fields of the tuple. */ public abstract Tuple newTuple(List c); /** + * Create a tuple from a provided list of objects, keeping the provided + * list. The new tuple will take over ownership of the provided list. + * @param list List of objects that will become the fields of the tuple. + */ + public abstract Tuple newTupleNoCopy(List list); + + /** * Create a tuple with a single element. This is useful because of * the fact that bags (currently) only take tuples, we often end up * sticking a single element in a tuple in order to put it in a bag. Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Fri Jan 9 08:56:25 2009 @@ -18,6 +18,7 @@ package org.apache.pig.impl.io; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -32,9 +33,13 @@ public class BufferedPositionedInputStream extends InputStream { long pos; InputStream in; + final int bufSize = 1024; public BufferedPositionedInputStream(InputStream in, long pos) { - this.in = in; + // Don't buffer a bzip stream as it will cause problems for split + // records. + if (in instanceof CBZip2InputStream) this.in = in; + else this.in = new BufferedInputStream(in, bufSize); this.pos = pos; } @@ -55,7 +60,7 @@ pos += read; return read; } - + @Override public long skip(long n) throws IOException { long rc = in.skip(n); Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=733083&r1=733082&r2=733083&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Jan 9 08:56:25 2009 @@ -2712,7 +2712,6 @@ case FunctionType.LOADFUNC: case FunctionType.STOREFUNC: //funcSpec = new FuncSpec(func.getClass().getName() + (functionArgs == null? "(" + ")" : "(" + functionArgs + ")")); - System.err.println("funcSpec: " + funcSpec); func = pigContext.instantiateFuncFromSpec(funcSpec); try{ FunctionType.tryCasting(func, funcType);