Author: gates
Date: Fri Jan  9 08:56:25 2009
New Revision: 733083

URL: http://svn.apache.org/viewvc?rev=733083&view=rev
Log:
PIG-599  Added buffering to BufferedPositionedInputStream.


Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
    hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Fri Jan  9 08:56:25 2009
@@ -357,3 +357,5 @@
        programmatically register a Pig Script.  (shubhamc via gates)
 
     PIG-570:  problems with handling bzip data (breed via olgan)
+
+       PIG-599: Added buffering to BufferedPositionedInputStream (gates)

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Fri 
Jan  9 08:56:25 2009
@@ -110,16 +110,17 @@
         if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
         mBuf.reset();
         while (true) {
-            // Hadoop's FSDataInputStream (which my input stream is based
-            // on at some point) is buffered, so I don't need to buffer.
+            // BufferedPositionedInputStream is buffered, so I don't need
+            // to buffer.
             int b = in.read();
 
             if (b == fieldDel) {
                 readField();
             } else if (b == recordDel) {
                 readField();
-                Tuple t =  mTupleFactory.newTuple(mProtoTuple);
-                mProtoTuple.clear();
+                //Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+                Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
+                mProtoTuple = null;
                 return t;
             } else if (b == -1) {
                 // hit end of file

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Fri Jan 
 9 08:56:25 2009
@@ -76,6 +76,19 @@
     }
 
     /**
+     * Construct a tuple from an existing list of objects.  Package
+     * level so that callers cannot directly invoke it.
+     * @param c List of objects to turn into a tuple.  This list will be kept
+     * as part of the tuple.
+     * @param junk Just used to differentiate from the constructor above that
+     * copies the list.
+     */
+    DefaultTuple(List<Object> c, int junk) {
+        mFields = c;
+    }
+
+
+    /**
      * Make this tuple reference the contents of another.  This method does 
not copy
      * the underlying data.   It maintains references to the data from the 
original
      * tuple (and possibly even to the data structure holding the data).

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTupleFactory.java 
Fri Jan  9 08:56:25 2009
@@ -39,6 +39,10 @@
         return new DefaultTuple(c);
     }
 
+    public Tuple newTupleNoCopy(List list) {
+        return new DefaultTuple(list, 1);
+    }
+
     public Tuple newTuple(Object datum) {
         Tuple t = new DefaultTuple(1);
         try {

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/TupleFactory.java Fri Jan 
 9 08:56:25 2009
@@ -90,12 +90,20 @@
     public abstract Tuple newTuple(int size);
     
     /**
-     * Create a tuple from the provided list of objects.
+     * Create a tuple from the provided list of objects.  The underlying list
+     * will be copied.
      * @param c List of objects to use as the fields of the tuple.
      */
     public abstract Tuple newTuple(List c);
 
     /**
+     * Create a tuple from a provided list of objects, keeping the provided
+     * list.  The new tuple will take over ownership of the provided list.
+     * @param list List of objects that will become the fields of the tuple.
+     */
+    public abstract Tuple newTupleNoCopy(List list);
+
+    /**
      * Create a tuple with a single element.  This is useful because of
      * the fact that bags (currently) only take tuples, we often end up
      * sticking a single element in a tuple in order to put it in a bag.

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 Fri Jan  9 08:56:25 2009
@@ -18,6 +18,7 @@
 
 package org.apache.pig.impl.io;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -32,9 +33,13 @@
 public class BufferedPositionedInputStream extends InputStream {
     long pos;
     InputStream in;
+    final int bufSize = 1024;
     
     public BufferedPositionedInputStream(InputStream in, long pos) {
-        this.in = in;
+               // Don't buffer a bzip stream as it will cause problems for 
split
+               // records.
+               if (in instanceof CBZip2InputStream) this.in = in;
+               else this.in = new BufferedInputStream(in, bufSize);
         this.pos = pos;
     }
     
@@ -55,7 +60,7 @@
         pos += read;
         return read;
     }
-    
+
     @Override
     public long skip(long n) throws IOException {
         long rc = in.skip(n);

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=733083&r1=733082&r2=733083&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Fri Jan  9 08:56:25 2009
@@ -2712,7 +2712,6 @@
             case FunctionType.LOADFUNC:
             case FunctionType.STOREFUNC:
                 //funcSpec = new FuncSpec(func.getClass().getName() + 
(functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
-                System.err.println("funcSpec: " + funcSpec);
                 func = pigContext.instantiateFuncFromSpec(funcSpec);
                        try{
                     FunctionType.tryCasting(func, funcType);


Reply via email to