Author: gates
Date: Fri Jan 9 08:56:25 2009
New Revision: 733083
URL: http://svn.apache.org/viewvc?rev=733083&view=rev
Log:
PIG-599 Added buffering to BufferedPositionedInputStream.
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Fri Jan 9 08:56:25 2009
@@ -357,3 +357,5 @@
programmatically register a Pig Script. (shubhamc via gates)
PIG-570: problems with handling bzip data (breed via olgan)
+
+ PIG-599: Added buffering to BufferedPositionedInputStream (gates)
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Fri
Jan 9 08:56:25 2009
@@ -110,16 +110,17 @@
if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
mBuf.reset();
while (true) {
- // Hadoop's FSDataInputStream (which my input stream is based
- // on at some point) is buffered, so I don't need to buffer.
+ // BufferedPositionedInputStream is buffered, so I don't need
+ // to buffer.
int b = in.read();
if (b == fieldDel) {
readField();
} else if (b == recordDel) {
readField();
- Tuple t = mTupleFactory.newTuple(mProtoTuple);
- mProtoTuple.clear();
+ //Tuple t = mTupleFactory.newTuple(mProtoTuple);
+ Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
+ mProtoTuple = null;
return t;
} else if (b == -1) {
// hit end of file
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Fri Jan
9 08:56:25 2009
@@ -76,6 +76,19 @@
}
/**
+ * Construct a tuple from an existing list of objects. Package
+ * level so that callers cannot directly invoke it.
+ * @param c List of objects to turn into a tuple. This list will be kept
+ * as part of the tuple.
+ * @param junk Just used to differentiate from the constructor above that
+ * copies the list.
+ */
+ DefaultTuple(List<Object> c, int junk) {
+ mFields = c;
+ }
+
+
+ /**
* Make this tuple reference the contents of another. This method does
not copy
* the underlying data. It maintains references to the data from the
original
* tuple (and possibly even to the data structure holding the data).
Modified:
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
Fri Jan 9 08:56:25 2009
@@ -39,6 +39,10 @@
return new DefaultTuple(c);
}
+ public Tuple newTupleNoCopy(List list) {
+ return new DefaultTuple(list, 1);
+ }
+
public Tuple newTuple(Object datum) {
Tuple t = new DefaultTuple(1);
try {
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java Fri Jan
9 08:56:25 2009
@@ -90,12 +90,20 @@
public abstract Tuple newTuple(int size);
/**
- * Create a tuple from the provided list of objects.
+ * Create a tuple from the provided list of objects. The underlying list
+ * will be copied.
* @param c List of objects to use as the fields of the tuple.
*/
public abstract Tuple newTuple(List c);
/**
+ * Create a tuple from a provided list of objects, keeping the provided
+ * list. The new tuple will take over ownership of the provided list.
+ * @param list List of objects that will become the fields of the tuple.
+ */
+ public abstract Tuple newTupleNoCopy(List list);
+
+ /**
* Create a tuple with a single element. This is useful because of
* the fact that bags (currently) only take tuples, we often end up
* sticking a single element in a tuple in order to put it in a bag.
Modified:
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
Fri Jan 9 08:56:25 2009
@@ -18,6 +18,7 @@
package org.apache.pig.impl.io;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -32,9 +33,13 @@
public class BufferedPositionedInputStream extends InputStream {
long pos;
InputStream in;
+ final int bufSize = 1024;
public BufferedPositionedInputStream(InputStream in, long pos) {
- this.in = in;
+ // Don't buffer a bzip stream as it will cause problems for
split
+ // records.
+ if (in instanceof CBZip2InputStream) this.in = in;
+ else this.in = new BufferedInputStream(in, bufSize);
this.pos = pos;
}
@@ -55,7 +60,7 @@
pos += read;
return read;
}
-
+
@Override
public long skip(long n) throws IOException {
long rc = in.skip(n);
Modified:
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
Fri Jan 9 08:56:25 2009
@@ -2712,7 +2712,6 @@
case FunctionType.LOADFUNC:
case FunctionType.STOREFUNC:
//funcSpec = new FuncSpec(func.getClass().getName() +
(functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
- System.err.println("funcSpec: " + funcSpec);
func = pigContext.instantiateFuncFromSpec(funcSpec);
try{
FunctionType.tryCasting(func, funcType);