Author: olga
Date: Thu Jan 24 14:02:19 2008
New Revision: 615045

URL: http://svn.apache.org/viewvc?rev=615045&view=rev
Log:
Fix for PIG-63

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
    
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
    incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Jan 24 14:02:19 2008
@@ -66,3 +66,5 @@
        comparator function instead of Class.forName.  (gates)
 
        PIG-56: Made DataBag implement Iterable. (groves via gates)
+
+       PIG-63: Fix for non-ascii UTF-8 data (breed@ and olgan@)

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Jan 24 
14:02:19 2008
@@ -17,18 +17,12 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.BufferedReader;
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.Iterator;
+import java.nio.charset.Charset;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
-import org.apache.pig.data.TimestampedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 
@@ -40,11 +34,10 @@
  */
 public class PigStorage implements LoadFunc, StoreFunc {
     protected BufferedPositionedInputStream in = null;
-    private DataInputStream inData = null;
-        
        long                end            = Long.MAX_VALUE;
-       private String recordDel = "\n";
+       private byte recordDel = (byte)'\n';
        private String fieldDel = "\t";
+       final private static Charset utf8 = Charset.forName("UTF8");
     
     public PigStorage() {
     }
@@ -66,7 +59,7 @@
             return null;
         }
         String line;
-        if((line = inData.readLine()) != null) {            
+        if((line = in.readLine(utf8, recordDel)) != null) {            
             return new Tuple(line, fieldDel);
         }
         return null;
@@ -74,11 +67,10 @@
 
        public void bindTo(String fileName, BufferedPositionedInputStream in, 
long offset, long end) throws IOException {
         this.in = in;
-        inData = new DataInputStream(in);
         this.end = end;
         
         // Since we are not block aligned we throw away the first
-        // record and cound on a different instance to read it
+        // record and could on a different instance to read it
         if (offset != 0) {
             getNext();
         }
@@ -90,7 +82,7 @@
     }
 
     public void putNext(Tuple f) throws IOException {
-        os.write((f.toDelimitedString(this.fieldDel) + 
this.recordDel).getBytes());
+        os.write((f.toDelimitedString(this.fieldDel) + 
(char)this.recordDel).getBytes(utf8));
     }
 
     public void finish() throws IOException {

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java Thu Jan 24 
14:02:19 2008
@@ -17,8 +17,8 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.data.DataAtom;
@@ -32,16 +32,14 @@
  */
 public class TextLoader implements LoadFunc{
        BufferedPositionedInputStream in;
-       private DataInputStream inData = null;
-    
+       final private static Charset utf8 = Charset.forName("UTF8");
        long                end;
 
        public void bindTo(String fileName, BufferedPositionedInputStream in, 
long offset, long end) throws IOException {
         this.in = in;
-        inData = new DataInputStream(in);
         this.end = end;
         // Since we are not block aligned we throw away the first
-        // record and cound on a different instance to read it
+        // record and could on a different instance to read it
         if (offset != 0)
             getNext();
     }
@@ -50,7 +48,7 @@
         if (in == null || in.getPosition() > end)
             return null;
         String line;
-        if ((line = inData.readLine()) != null) {
+        if ((line = in.readLine(utf8, (byte)'\n')) != null) {
             Tuple t = new Tuple(1);
             t.setField(0, new DataAtom(line));
             return t;

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
 Thu Jan 24 14:02:19 2008
@@ -20,6 +20,12 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
 
 import org.apache.tools.bzip2r.CBZip2InputStream;
 
@@ -66,5 +72,92 @@
        return pos;
     }
 
+    /*
+     * Preallocated array for readline buffering
+     */
+    private byte barray[] = new byte[1024];
+
+    /*
+     * Preallocated ByteBuffer for readline buffering
+     */
+    private ByteBuffer bbuff = ByteBuffer.wrap(barray);
+
+    /*
+     * Preallocated char array for decoding bytes
+     */
+    private char carray[] = new char[1024];
+
+    /*
+     * Preallocated CharBuffer for decoding bytes
+     */
+    private CharBuffer cbuff = CharBuffer.wrap(carray);
 
+    public String readLine(Charset charset, byte delimiter) throws IOException 
{
+        CharsetDecoder decoder = charset.newDecoder();
+        decoder.onMalformedInput(CodingErrorAction.REPLACE);
+        decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+        int delim = delimiter&0xff;
+        int rc;
+        int offset = 0;
+        StringBuilder sb = null;
+        CoderResult res;
+        while ((rc = read())!=-1) {
+            if (rc == delim) {
+                break;
+            }
+            barray[offset++] = (byte)rc;
+            if (barray.length == offset) {
+                bbuff.position(0);
+                bbuff.limit(barray.length);
+                cbuff.position(0);
+                cbuff.limit(carray.length);
+                res = decoder.decode(bbuff, cbuff, false);
+                if (res.isError()) {
+                    throw new IOException("Decoding error: " + res.toString());
+                }
+                offset = bbuff.remaining();
+                switch (offset) {
+                default:
+                    System.arraycopy(barray, bbuff.position(), barray, 0, bbuff
+                            .remaining());
+                    break;
+                case 2:
+                    barray[1] = barray[barray.length - 1];
+                    barray[0] = barray[barray.length - 2];
+                    break;
+                case 1:
+                    barray[0] = barray[barray.length - 1];
+                    break;
+                case 0:
+                }
+                if (sb == null) {
+                    sb = new StringBuilder(cbuff.position());
+                }
+                sb.append(carray, 0, cbuff.position());
+            }
+        }
+        if (sb == null) {
+            if (rc == -1 && offset == 0) {
+                // We are at EOF with nothing read
+                return null;
+            }
+            sb = new StringBuilder();
+        }
+        bbuff.position(0);
+        bbuff.limit(offset);
+        cbuff.position(0);
+        cbuff.limit(carray.length);
+        res = decoder.decode(bbuff, cbuff, true);
+        if (res.isError()) {
+            System.out.println("Error");
+        }
+        sb.append(carray, 0, cbuff.position());
+        cbuff.position(0);
+        res = decoder.flush(cbuff);
+        if (res.isError()) {
+            System.out.println("Error");
+        }
+        sb.append(carray, 0, cbuff.position());
+        return sb.toString();
+    }
 }

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=615045&r1=615044&r2=615045&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Thu Jan 24 
14:02:19 2008
@@ -18,9 +18,6 @@
 package org.apache.pig.test;
 
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.Iterator;
 
@@ -39,11 +36,8 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.builtin.ShellBagEvalFunc;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.PigContext;
 
 public class TestBuiltin extends TestCase {
        
@@ -312,6 +306,28 @@
         p1.bindTo(null, new BufferedPositionedInputStream(ffis1), 0, 
input1.getBytes().length);
         Tuple f1 = p1.getNext();
         assertTrue(f1.arity() == arity1);
+
+        LoadFunc p15 = new PigStorage();
+        StringBuilder sb = new StringBuilder();
+        int LOOP_COUNT = 1024;
+        for (int i = 0; i < LOOP_COUNT; i++) {
+            for (int j = 0; j < LOOP_COUNT; j++) {
+                sb.append(i + "\t" + i + "\t" + j % 2 + "\n");
+            }
+        }
+        FakeFSInputStream ffis15 = new FakeFSInputStream(sb.toString()
+                .getBytes());
+        p15.bindTo(null, new BufferedPositionedInputStream(ffis15), 0, input1
+                .getBytes().length);
+        int count = 0;
+        while (true) {
+            Tuple f15 = p15.getNext();
+            if (f15 == null)
+                break;
+            count++;
+            assertEquals(3, f15.arity());
+        }
+        assertEquals(LOOP_COUNT * LOOP_COUNT, count);
 
         String input2 = ":this:has:a:leading:colon\n";
         int arity2 = 6;


Reply via email to