Author: olga
Date: Tue Nov 18 09:13:24 2008
New Revision: 718654

URL: http://svn.apache.org/viewvc?rev=718654&view=rev
Log:
PIG-527: allow PigStorage to write complex data

Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java
    hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Nov 18 09:13:24 2008
@@ -316,3 +316,5 @@
     PIG-512: Expressions in foreach lead to errors (sms via olgan)
 
     PIG-528: use UDF return in schema computation (sms via olgan)
+
+    PIG-527: allow PigStorage to write out complex output (sms via olgan)

Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue 
Nov 18 09:13:24 2008
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Iterator;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
@@ -35,6 +36,7 @@
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -59,6 +61,7 @@
     private int os;
     private static final int OS_UNIX = 0;
     private static final int OS_WINDOWS = 1;
+    private static final String UTF8 = "UTF-8";
     
     public PigStorage() {
         os = OS_UNIX;
@@ -143,6 +146,109 @@
         mOut = os;
     }
 
+    private void putField(Object field) throws IOException {
+        //string constants for each delimiter
+        String tupleBeginDelim = "(";
+        String tupleEndDelim = ")";
+        String bagBeginDelim = "{";
+        String bagEndDelim = "}";
+        String mapBeginDelim = "[";
+        String mapEndDelim = "]";
+        String fieldDelim = ",";
+        String mapKeyValueDelim = "#";
+
+        switch (DataType.findType(field)) {
+        case DataType.NULL:
+            break; // just leave it empty
+
+        case DataType.BOOLEAN:
+            mOut.write(((Boolean)field).toString().getBytes());
+            break;
+
+        case DataType.INTEGER:
+            mOut.write(((Integer)field).toString().getBytes());
+            break;
+
+        case DataType.LONG:
+            mOut.write(((Long)field).toString().getBytes());
+            break;
+
+        case DataType.FLOAT:
+            mOut.write(((Float)field).toString().getBytes());
+            break;
+
+        case DataType.DOUBLE:
+            mOut.write(((Double)field).toString().getBytes());
+            break;
+
+        case DataType.BYTEARRAY: {
+            byte[] b = ((DataByteArray)field).get();
+            mOut.write(b, 0, b.length);
+            break;
+                                 }
+
+        case DataType.CHARARRAY:
+            // oddly enough, writeBytes writes a string
+            mOut.write(((String)field).getBytes(UTF8));
+            break;
+
+        case DataType.MAP:
+            boolean mapHasNext = false;
+            Map<Object, Object> m = (Map<Object, Object>)field;
+            mOut.write(mapBeginDelim.getBytes(UTF8));
+            for(Object o: m.keySet()) {
+                if(mapHasNext) {
+                    mOut.write(fieldDelim.getBytes(UTF8));
+                } else {
+                    mapHasNext = true;
+                }
+                putField(o);
+                mOut.write(mapKeyValueDelim.getBytes(UTF8));
+                putField(m.get(o));
+            }
+            mOut.write(mapEndDelim.getBytes(UTF8));
+            break;
+
+        case DataType.TUPLE:
+            boolean tupleHasNext = false;
+            Tuple t = (Tuple)field;
+            mOut.write(tupleBeginDelim.getBytes(UTF8));
+            for(int i = 0; i < t.size(); ++i) {
+                if(tupleHasNext) {
+                    mOut.write(fieldDelim.getBytes(UTF8));
+                } else {
+                    tupleHasNext = true;
+                }
+                try {
+                    putField(t.get(i));
+                } catch (ExecException ee) {
+                    throw new RuntimeException(ee);
+                }
+            }
+            mOut.write(tupleEndDelim.getBytes(UTF8));
+            break;
+
+        case DataType.BAG:
+            boolean bagHasNext = false;
+            mOut.write(bagBeginDelim.getBytes(UTF8));
+            Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+            while(tupleIter.hasNext()) {
+                if(bagHasNext) {
+                    mOut.write(fieldDelim.getBytes(UTF8));
+                } else {
+                    bagHasNext = true;
+                }
+                putField((Object)tupleIter.next());
+            }
+            mOut.write(bagEndDelim.getBytes(UTF8));
+            break;
+            
+        default:
+            throw new RuntimeException("Unknown datatype " + 
+                DataType.findType(field));
+        }
+    }
+
     public void putNext(Tuple f) throws IOException {
         // I have to convert integer fields to string, and then to bytes.
         // If I use a DataOutputStream to convert directly from integer to
@@ -155,51 +261,8 @@
             } catch (ExecException ee) {
                 throw new RuntimeException(ee);
             }
-            switch (DataType.findType(field)) {
-            case DataType.NULL:
-                break; // just leave it empty
-
-            case DataType.BOOLEAN:
-                mOut.write(((Boolean)field).toString().getBytes());
-                break;
 
-            case DataType.INTEGER:
-                mOut.write(((Integer)field).toString().getBytes());
-                break;
-
-            case DataType.LONG:
-                mOut.write(((Long)field).toString().getBytes());
-                break;
-
-            case DataType.FLOAT:
-                mOut.write(((Float)field).toString().getBytes());
-                break;
-
-            case DataType.DOUBLE:
-                mOut.write(((Double)field).toString().getBytes());
-                break;
-
-            case DataType.BYTEARRAY: {
-                byte[] b = ((DataByteArray)field).get();
-                mOut.write(b, 0, b.length);
-                break;
-                                     }
-
-            case DataType.CHARARRAY:
-                // oddly enough, writeBytes writes a string
-                mOut.write(((String)field).getBytes("UTF-8"));
-                break;
-
-            case DataType.MAP:
-            case DataType.TUPLE:
-            case DataType.BAG:
-                throw new IOException("Cannot store a non-flat tuple " +
-                    "using PigStorage");
-                
-            default:
-                throw new RuntimeException("Unknown datatype " + 
-                    DataType.findType(field));
-            }
+            putField(field);
 
             if (i == sz - 1) {
                 // last field in tuple.

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java 
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Tue Nov 
18 09:13:24 2008
@@ -190,9 +190,6 @@
         StringBuilder buf = new StringBuilder();
         for (Iterator<Object> it = mFields.iterator(); it.hasNext();) {
             Object field = it.next();
-            if (DataType.isComplex(field)) {
-                throw new ExecException("Unable to convert non-flat tuple to 
string.");
-            }
             buf.append(field == null ? "" : field.toString());
             if (it.hasNext())
                 buf.append(delim);
@@ -219,7 +216,7 @@
                     }
                 }
             } else {
-                sb.append("NULL");
+                sb.append("");
             }
             if (it.hasNext())
                 sb.append(",");

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Tue 
Nov 18 09:13:24 2008
@@ -227,7 +227,7 @@
     public void testTupleToString() throws Exception {
         Tuple t = giveMeOneOfEach();
 
-        assertEquals("toString", "((3,3.0F),{(4),(mary had a little 
lamb)},[hello#world,goodbye#all],42,5000000000L,3.1415927F,2.99792458E8,true,hello,goodbye,NULL)",
 t.toString());
+        assertEquals("toString", "((3,3.0F),{(4),(mary had a little 
lamb)},[hello#world,goodbye#all],42,5000000000L,3.1415927F,2.99792458E8,true,hello,goodbye,)",
 t.toString());
     }
 
     @Test

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java Tue Nov 
18 09:13:24 2008
@@ -19,6 +19,7 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
@@ -29,9 +30,11 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -54,6 +57,7 @@
     DataBag inpDB;
     static MiniCluster cluster = MiniCluster.buildCluster();
     PigContext pc;
+    POProject proj;
     
     @Before
     public void setUp() throws Exception {
@@ -65,7 +69,7 @@
         pc.connect();
         st.setPc(pc);
         
-        POProject proj = GenPhyOp.exprProject();
+        proj = GenPhyOp.exprProject();
         proj.setColumn(0);
         proj.setResultType(DataType.TUPLE);
         proj.setOverloaded(true);
@@ -73,10 +77,6 @@
         inps.add(proj);
         st.setInputs(inps);
         
-        inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
-        Tuple t = new DefaultTuple();
-        t.append(inpDB);
-        proj.attachInput(t);
     }
 
     @After
@@ -85,6 +85,10 @@
 
     @Test
     public void testStore() throws ExecException, IOException {
+        inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
+        Tuple t = new DefaultTuple();
+        t.append(inpDB);
+        proj.attachInput(t);
         Result res = st.store();
         assertEquals(POStatus.STATUS_EOP, res.returnStatus);
         
@@ -92,9 +96,45 @@
         BufferedReader br = new BufferedReader(new 
FileReader("/tmp/storeTest.txt"));
         for(String line=br.readLine();line!=null;line=br.readLine()){
             String[] flds = line.split(":",-1);
-            Tuple t = new DefaultTuple();
+            t = new DefaultTuple();
             t.append(flds[0].compareTo("")!=0 ? flds[0] : null);
-            t.append(Integer.parseInt(flds[1]));
+            t.append(flds[1].compareTo("")!=0 ? Integer.parseInt(flds[1]) : 
null);
+            
+            System.err.println("Simple data: ");
+            System.err.println(line);
+            System.err.println("t: ");
+            System.err.println(t);
+            assertEquals(true, TestHelper.bagContains(inpDB, t));
+            ++size;
+        }
+        assertEquals(true, size==inpDB.size());
+        FileLocalizer.delete(fSpec.getFileName(), pc);
+    }
+
+    @Test
+    public void testStoreComplexData() throws ExecException, IOException {
+        inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
+        Tuple t = new DefaultTuple();
+        t.append(inpDB);
+        proj.attachInput(t);
+        Result res = st.store();
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        PigStorage ps = new PigStorage(":");
+        
+        int size = 0;
+        BufferedReader br = new BufferedReader(new 
FileReader("/tmp/storeTest.txt"));
+        for(String line=br.readLine();line!=null;line=br.readLine()){
+            String[] flds = line.split(":",-1);
+            t = new DefaultTuple();
+            t.append(flds[0].compareTo("")!=0 ? 
ps.bytesToBag(flds[0].getBytes()) : null);
+            t.append(flds[1].compareTo("")!=0 ? 
ps.bytesToCharArray(flds[1].getBytes()) : null);
+            t.append(flds[2].compareTo("")!=0 ? 
ps.bytesToCharArray(flds[2].getBytes()) : null);
+            t.append(flds[3].compareTo("")!=0 ? 
ps.bytesToDouble(flds[3].getBytes()) : null);
+            t.append(flds[4].compareTo("")!=0 ? 
ps.bytesToFloat(flds[4].getBytes()) : null);
+            t.append(flds[5].compareTo("")!=0 ? 
ps.bytesToInteger(flds[5].getBytes()) : null);
+            t.append(flds[6].compareTo("")!=0 ? 
ps.bytesToLong(flds[6].getBytes()) : null);
+            t.append(flds[7].compareTo("")!=0 ? 
ps.bytesToMap(flds[7].getBytes()) : null);
+            t.append(flds[8].compareTo("")!=0 ? 
ps.bytesToTuple(flds[8].getBytes()) : null);
             
             assertEquals(true, TestHelper.bagContains(inpDB, t));
             ++size;
@@ -103,4 +143,40 @@
         FileLocalizer.delete(fSpec.getFileName(), pc);
     }
 
+    @Test
+    public void testStoreComplexDataWithNull() throws ExecException, 
IOException {
+        Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new 
Random(), 10, 100);
+        inpDB = DefaultBagFactory.getInstance().newDefaultBag();
+        inpDB.add(inputTuple);
+        Tuple t = new DefaultTuple();
+        t.append(inpDB);
+        proj.attachInput(t);
+        Result res = st.store();
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        PigStorage ps = new PigStorage(":");
+        
+        int size = 0;
+        BufferedReader br = new BufferedReader(new 
FileReader("/tmp/storeTest.txt"));
+        for(String line=br.readLine();line!=null;line=br.readLine()){
+            System.err.println("Complex data: ");
+            System.err.println(line);
+            String[] flds = line.split(":",-1);
+            t = new DefaultTuple();
+            t.append(flds[0].compareTo("")!=0 ? 
ps.bytesToBag(flds[0].getBytes()) : null);
+            t.append(flds[1].compareTo("")!=0 ? 
ps.bytesToCharArray(flds[1].getBytes()) : null);
+            t.append(flds[2].compareTo("")!=0 ? 
ps.bytesToCharArray(flds[2].getBytes()) : null);
+            t.append(flds[3].compareTo("")!=0 ? 
ps.bytesToDouble(flds[3].getBytes()) : null);
+            t.append(flds[4].compareTo("")!=0 ? 
ps.bytesToFloat(flds[4].getBytes()) : null);
+            t.append(flds[5].compareTo("")!=0 ? 
ps.bytesToInteger(flds[5].getBytes()) : null);
+            t.append(flds[6].compareTo("")!=0 ? 
ps.bytesToLong(flds[6].getBytes()) : null);
+            t.append(flds[7].compareTo("")!=0 ? 
ps.bytesToMap(flds[7].getBytes()) : null);
+            t.append(flds[8].compareTo("")!=0 ? 
ps.bytesToTuple(flds[8].getBytes()) : null);
+            t.append(flds[9].compareTo("")!=0 ? 
ps.bytesToCharArray(flds[9].getBytes()) : null);
+            
+            assertTrue(inputTuple.equals(t));
+            ++size;
+        }
+        FileLocalizer.delete(fSpec.getFileName(), pc);
+    }
+
 }

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java 
Tue Nov 18 09:13:24 2008
@@ -236,4 +236,30 @@
         t.append(null);
         return t;
     }
+
+    public static Tuple genRandSmallBagTextTupleWithNulls(Random r, int num, 
int limit){
+        if(r==null){
+            Tuple t = new DefaultTuple();
+            t.append("RANDOM");
+            return t;
+        }
+        Tuple t = new DefaultTuple();
+        t.append(genRandSmallTupDataBag(r, num, limit));
+        t.append(new Boolean(r.nextBoolean()).toString());
+        //TODO Fix
+        //The text representation of byte array and char array
+        //cannot be disambiguated without annotation. For now,
+        //the tuples will not contain byte array
+        //t.append(genRandTextDBA(r));
+        t.append(genRandString(r));
+        t.append(r.nextDouble());
+        t.append(r.nextFloat());
+        t.append(r.nextInt());
+        t.append(r.nextLong());
+        t.append(genRandMap(r, num));
+        t.append(genRandSmallTuple(r, 100));
+        t.append(null);
+        return t;
+    }
+    
 }


Reply via email to