Author: olga
Date: Tue Nov 18 09:13:24 2008
New Revision: 718654
URL: http://svn.apache.org/viewvc?rev=718654&view=rev
Log:
PIG-527: allow PigStorage to write complex data
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java
hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Nov 18 09:13:24 2008
@@ -316,3 +316,5 @@
PIG-512: Expressions in foreach lead to errors (sms via olgan)
PIG-528: use UDF return in schema computation (sms via olgan)
+
+ PIG-527: allow PigStorage to write out complex output (sms via olgan)
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue
Nov 18 09:13:24 2008
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
+import java.util.Iterator;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -35,6 +36,7 @@
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -59,6 +61,7 @@
private int os;
private static final int OS_UNIX = 0;
private static final int OS_WINDOWS = 1;
+ private static final String UTF8 = "UTF-8";
public PigStorage() {
os = OS_UNIX;
@@ -143,6 +146,109 @@
mOut = os;
}
+ private void putField(Object field) throws IOException {
+ //string constants for each delimiter
+ String tupleBeginDelim = "(";
+ String tupleEndDelim = ")";
+ String bagBeginDelim = "{";
+ String bagEndDelim = "}";
+ String mapBeginDelim = "[";
+ String mapEndDelim = "]";
+ String fieldDelim = ",";
+ String mapKeyValueDelim = "#";
+
+ switch (DataType.findType(field)) {
+ case DataType.NULL:
+ break; // just leave it empty
+
+ case DataType.BOOLEAN:
+ mOut.write(((Boolean)field).toString().getBytes());
+ break;
+
+ case DataType.INTEGER:
+ mOut.write(((Integer)field).toString().getBytes());
+ break;
+
+ case DataType.LONG:
+ mOut.write(((Long)field).toString().getBytes());
+ break;
+
+ case DataType.FLOAT:
+ mOut.write(((Float)field).toString().getBytes());
+ break;
+
+ case DataType.DOUBLE:
+ mOut.write(((Double)field).toString().getBytes());
+ break;
+
+ case DataType.BYTEARRAY: {
+ byte[] b = ((DataByteArray)field).get();
+ mOut.write(b, 0, b.length);
+ break;
+ }
+
+ case DataType.CHARARRAY:
+ // oddly enough, writeBytes writes a string
+ mOut.write(((String)field).getBytes(UTF8));
+ break;
+
+ case DataType.MAP:
+ boolean mapHasNext = false;
+ Map<Object, Object> m = (Map<Object, Object>)field;
+ mOut.write(mapBeginDelim.getBytes(UTF8));
+ for(Object o: m.keySet()) {
+ if(mapHasNext) {
+ mOut.write(fieldDelim.getBytes(UTF8));
+ } else {
+ mapHasNext = true;
+ }
+ putField(o);
+ mOut.write(mapKeyValueDelim.getBytes(UTF8));
+ putField(m.get(o));
+ }
+ mOut.write(mapEndDelim.getBytes(UTF8));
+ break;
+
+ case DataType.TUPLE:
+ boolean tupleHasNext = false;
+ Tuple t = (Tuple)field;
+ mOut.write(tupleBeginDelim.getBytes(UTF8));
+ for(int i = 0; i < t.size(); ++i) {
+ if(tupleHasNext) {
+ mOut.write(fieldDelim.getBytes(UTF8));
+ } else {
+ tupleHasNext = true;
+ }
+ try {
+ putField(t.get(i));
+ } catch (ExecException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+ mOut.write(tupleEndDelim.getBytes(UTF8));
+ break;
+
+ case DataType.BAG:
+ boolean bagHasNext = false;
+ mOut.write(bagBeginDelim.getBytes(UTF8));
+ Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+ while(tupleIter.hasNext()) {
+ if(bagHasNext) {
+ mOut.write(fieldDelim.getBytes(UTF8));
+ } else {
+ bagHasNext = true;
+ }
+ putField((Object)tupleIter.next());
+ }
+ mOut.write(bagEndDelim.getBytes(UTF8));
+ break;
+
+ default:
+ throw new RuntimeException("Unknown datatype " +
+ DataType.findType(field));
+ }
+ }
+
public void putNext(Tuple f) throws IOException {
// I have to convert integer fields to string, and then to bytes.
// If I use a DataOutputStream to convert directly from integer to
@@ -155,51 +261,8 @@
} catch (ExecException ee) {
throw new RuntimeException(ee);
}
- switch (DataType.findType(field)) {
- case DataType.NULL:
- break; // just leave it empty
-
- case DataType.BOOLEAN:
- mOut.write(((Boolean)field).toString().getBytes());
- break;
- case DataType.INTEGER:
- mOut.write(((Integer)field).toString().getBytes());
- break;
-
- case DataType.LONG:
- mOut.write(((Long)field).toString().getBytes());
- break;
-
- case DataType.FLOAT:
- mOut.write(((Float)field).toString().getBytes());
- break;
-
- case DataType.DOUBLE:
- mOut.write(((Double)field).toString().getBytes());
- break;
-
- case DataType.BYTEARRAY: {
- byte[] b = ((DataByteArray)field).get();
- mOut.write(b, 0, b.length);
- break;
- }
-
- case DataType.CHARARRAY:
- // oddly enough, writeBytes writes a string
- mOut.write(((String)field).getBytes("UTF-8"));
- break;
-
- case DataType.MAP:
- case DataType.TUPLE:
- case DataType.BAG:
- throw new IOException("Cannot store a non-flat tuple " +
- "using PigStorage");
-
- default:
- throw new RuntimeException("Unknown datatype " +
- DataType.findType(field));
- }
+ putField(field);
if (i == sz - 1) {
// last field in tuple.
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
(original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Tue Nov
18 09:13:24 2008
@@ -190,9 +190,6 @@
StringBuilder buf = new StringBuilder();
for (Iterator<Object> it = mFields.iterator(); it.hasNext();) {
Object field = it.next();
- if (DataType.isComplex(field)) {
- throw new ExecException("Unable to convert non-flat tuple to
string.");
- }
buf.append(field == null ? "" : field.toString());
if (it.hasNext())
buf.append(delim);
@@ -219,7 +216,7 @@
}
}
} else {
- sb.append("NULL");
+ sb.append("");
}
if (it.hasNext())
sb.append(",");
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Tue
Nov 18 09:13:24 2008
@@ -227,7 +227,7 @@
public void testTupleToString() throws Exception {
Tuple t = giveMeOneOfEach();
- assertEquals("toString", "((3,3.0F),{(4),(mary had a little
lamb)},[hello#world,goodbye#all],42,5000000000L,3.1415927F,2.99792458E8,true,hello,goodbye,NULL)",
t.toString());
+ assertEquals("toString", "((3,3.0F),{(4),(mary had a little
lamb)},[hello#world,goodbye#all],42,5000000000L,3.1415927F,2.99792458E8,true,hello,goodbye,)",
t.toString());
}
@Test
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStore.java Tue Nov
18 09:13:24 2008
@@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
+import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
@@ -29,9 +30,11 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -54,6 +57,7 @@
DataBag inpDB;
static MiniCluster cluster = MiniCluster.buildCluster();
PigContext pc;
+ POProject proj;
@Before
public void setUp() throws Exception {
@@ -65,7 +69,7 @@
pc.connect();
st.setPc(pc);
- POProject proj = GenPhyOp.exprProject();
+ proj = GenPhyOp.exprProject();
proj.setColumn(0);
proj.setResultType(DataType.TUPLE);
proj.setOverloaded(true);
@@ -73,10 +77,6 @@
inps.add(proj);
st.setInputs(inps);
- inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
- Tuple t = new DefaultTuple();
- t.append(inpDB);
- proj.attachInput(t);
}
@After
@@ -85,6 +85,10 @@
@Test
public void testStore() throws ExecException, IOException {
+ inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
+ Tuple t = new DefaultTuple();
+ t.append(inpDB);
+ proj.attachInput(t);
Result res = st.store();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
@@ -92,9 +96,45 @@
BufferedReader br = new BufferedReader(new
FileReader("/tmp/storeTest.txt"));
for(String line=br.readLine();line!=null;line=br.readLine()){
String[] flds = line.split(":",-1);
- Tuple t = new DefaultTuple();
+ t = new DefaultTuple();
t.append(flds[0].compareTo("")!=0 ? flds[0] : null);
- t.append(Integer.parseInt(flds[1]));
+ t.append(flds[1].compareTo("")!=0 ? Integer.parseInt(flds[1]) :
null);
+
+ System.err.println("Simple data: ");
+ System.err.println(line);
+ System.err.println("t: ");
+ System.err.println(t);
+ assertEquals(true, TestHelper.bagContains(inpDB, t));
+ ++size;
+ }
+ assertEquals(true, size==inpDB.size());
+ FileLocalizer.delete(fSpec.getFileName(), pc);
+ }
+
+ @Test
+ public void testStoreComplexData() throws ExecException, IOException {
+ inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
+ Tuple t = new DefaultTuple();
+ t.append(inpDB);
+ proj.attachInput(t);
+ Result res = st.store();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ PigStorage ps = new PigStorage(":");
+
+ int size = 0;
+ BufferedReader br = new BufferedReader(new
FileReader("/tmp/storeTest.txt"));
+ for(String line=br.readLine();line!=null;line=br.readLine()){
+ String[] flds = line.split(":",-1);
+ t = new DefaultTuple();
+ t.append(flds[0].compareTo("")!=0 ?
ps.bytesToBag(flds[0].getBytes()) : null);
+ t.append(flds[1].compareTo("")!=0 ?
ps.bytesToCharArray(flds[1].getBytes()) : null);
+ t.append(flds[2].compareTo("")!=0 ?
ps.bytesToCharArray(flds[2].getBytes()) : null);
+ t.append(flds[3].compareTo("")!=0 ?
ps.bytesToDouble(flds[3].getBytes()) : null);
+ t.append(flds[4].compareTo("")!=0 ?
ps.bytesToFloat(flds[4].getBytes()) : null);
+ t.append(flds[5].compareTo("")!=0 ?
ps.bytesToInteger(flds[5].getBytes()) : null);
+ t.append(flds[6].compareTo("")!=0 ?
ps.bytesToLong(flds[6].getBytes()) : null);
+ t.append(flds[7].compareTo("")!=0 ?
ps.bytesToMap(flds[7].getBytes()) : null);
+ t.append(flds[8].compareTo("")!=0 ?
ps.bytesToTuple(flds[8].getBytes()) : null);
assertEquals(true, TestHelper.bagContains(inpDB, t));
++size;
@@ -103,4 +143,40 @@
FileLocalizer.delete(fSpec.getFileName(), pc);
}
+ @Test
+ public void testStoreComplexDataWithNull() throws ExecException,
IOException {
+ Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new
Random(), 10, 100);
+ inpDB = DefaultBagFactory.getInstance().newDefaultBag();
+ inpDB.add(inputTuple);
+ Tuple t = new DefaultTuple();
+ t.append(inpDB);
+ proj.attachInput(t);
+ Result res = st.store();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ PigStorage ps = new PigStorage(":");
+
+ int size = 0;
+ BufferedReader br = new BufferedReader(new
FileReader("/tmp/storeTest.txt"));
+ for(String line=br.readLine();line!=null;line=br.readLine()){
+ System.err.println("Complex data: ");
+ System.err.println(line);
+ String[] flds = line.split(":",-1);
+ t = new DefaultTuple();
+ t.append(flds[0].compareTo("")!=0 ?
ps.bytesToBag(flds[0].getBytes()) : null);
+ t.append(flds[1].compareTo("")!=0 ?
ps.bytesToCharArray(flds[1].getBytes()) : null);
+ t.append(flds[2].compareTo("")!=0 ?
ps.bytesToCharArray(flds[2].getBytes()) : null);
+ t.append(flds[3].compareTo("")!=0 ?
ps.bytesToDouble(flds[3].getBytes()) : null);
+ t.append(flds[4].compareTo("")!=0 ?
ps.bytesToFloat(flds[4].getBytes()) : null);
+ t.append(flds[5].compareTo("")!=0 ?
ps.bytesToInteger(flds[5].getBytes()) : null);
+ t.append(flds[6].compareTo("")!=0 ?
ps.bytesToLong(flds[6].getBytes()) : null);
+ t.append(flds[7].compareTo("")!=0 ?
ps.bytesToMap(flds[7].getBytes()) : null);
+ t.append(flds[8].compareTo("")!=0 ?
ps.bytesToTuple(flds[8].getBytes()) : null);
+ t.append(flds[9].compareTo("")!=0 ?
ps.bytesToCharArray(flds[9].getBytes()) : null);
+
+ assertTrue(inputTuple.equals(t));
+ ++size;
+ }
+ FileLocalizer.delete(fSpec.getFileName(), pc);
+ }
+
}
Modified:
hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=718654&r1=718653&r2=718654&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
Tue Nov 18 09:13:24 2008
@@ -236,4 +236,30 @@
t.append(null);
return t;
}
+
+ public static Tuple genRandSmallBagTextTupleWithNulls(Random r, int num,
int limit){
+ if(r==null){
+ Tuple t = new DefaultTuple();
+ t.append("RANDOM");
+ return t;
+ }
+ Tuple t = new DefaultTuple();
+ t.append(genRandSmallTupDataBag(r, num, limit));
+ t.append(new Boolean(r.nextBoolean()).toString());
+ //TODO Fix
+ //The text representation of byte array and char array
+ //cannot be disambiguated without annotation. For now,
+ //the tuples will not contain byte array
+ //t.append(genRandTextDBA(r));
+ t.append(genRandString(r));
+ t.append(r.nextDouble());
+ t.append(r.nextFloat());
+ t.append(r.nextInt());
+ t.append(r.nextLong());
+ t.append(genRandMap(r, num));
+ t.append(genRandSmallTuple(r, 100));
+ t.append(null);
+ return t;
+ }
+
}