Author: yanz
Date: Mon Dec 7 20:44:57 2009
New Revision: 888127
URL: http://svn.apache.org/viewvc?rev=888127&view=rev
Log:
PIG-1104 Streaming Support (Chao Wang via yanz)
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CsvZebraTupleOutput.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTuple.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTupleFactory.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestZebraTupleTostring.java
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestCheckin.java
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Mon Dec 7
20:44:57 2009
@@ -6,6 +6,8 @@
IMPROVEMENTS
+ PIG-1104 Streaming Support (Chao Wang via yanz)
+
PIG-1119 Support of "group" as a column name (Gaurav Jain via yanz)
PIG-1111 Multiple Outputs Support (Gaurav Jain via yanz)
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
Mon Dec 7 20:44:57 2009
@@ -184,6 +184,15 @@
static TableExpr getInputExpr(JobConf conf) throws IOException {
String expr = conf.get(INPUT_EXPR);
if (expr == null) {
+ // try setting from input path
+ Path[] paths = FileInputFormat.getInputPaths(conf);
+ if (paths != null) {
+ setInputPaths(conf, paths);
+ }
+ expr = conf.get(INPUT_EXPR);
+ }
+
+ if (expr == null) {
throw new IllegalArgumentException("Input expression not defined.");
}
StringReader in = new StringReader(expr);
@@ -400,8 +409,8 @@
*/
@Override
public RecordReader<BytesWritable, Tuple> getRecordReader(InputSplit split,
- JobConf conf, Reporter reporter) throws IOException {
- TableExpr expr = getInputExpr(conf);
+ JobConf conf, Reporter reporter) throws IOException {
+ TableExpr expr = getInputExpr(conf);
if (expr == null) {
throw new IOException("Table expression not defined");
}
@@ -410,6 +419,7 @@
expr.setSortedSplit();
String strProj = conf.get(INPUT_PROJ);
+
String projection = null;
try {
if (strProj == null) {
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
Mon Dec 7 20:44:57 2009
@@ -45,8 +45,6 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.hadoop.zebra.pig.comparator.*;
/**
* Pig CommittableStoreFunc for Zebra Table
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CsvZebraTupleOutput.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CsvZebraTupleOutput.java?rev=888127&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CsvZebraTupleOutput.java
(added)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CsvZebraTupleOutput.java
Mon Dec 7 20:44:57 2009
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ */
+class CsvZebraTupleOutput {
+ private StringBuilder sb;
+ private boolean isFirst = true;
+ protected static final Log LOG =
LogFactory.getLog(CsvZebraTupleOutput.class);
+ private static CsvZebraTupleOutput instance = null;
+
+ String toCSVString(String s) {
+ StringBuffer sb = new StringBuffer(s.length() + 1);
+ sb.append('\'');
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ char c = s.charAt(i);
+ switch (c) {
+ case '\0':
+ sb.append("%00");
+ break;
+ case '\n':
+ sb.append("%0A");
+ break;
+ case '\r':
+ sb.append("%0D");
+ break;
+ case ',':
+ sb.append("%2C");
+ break;
+ case '}':
+ sb.append("%7D");
+ break;
+ case '%':
+ sb.append("%25");
+ break;
+ default:
+ sb.append(c);
+ }
+ }
+ return sb.toString();
+ }
+
+ String toCSVBuffer(DataByteArray buf) {
+ StringBuffer sb = new StringBuffer("#");
+ sb.append(buf.toString());
+ return sb.toString();
+ }
+
+ void printCommaUnlessFirst() {
+ if (!isFirst) {
+ sb.append(",");
+ }
+ isFirst = false;
+ }
+
+ /** Creates a new instance of CsvZebraTupleOutput */
+ private CsvZebraTupleOutput() {
+ sb = new StringBuilder();
+ }
+
+ void reset() {
+ sb.delete(0, sb.length());
+ isFirst = true;
+ }
+
+ static CsvZebraTupleOutput createCsvZebraTupleOutput() {
+ if (instance == null) {
+ instance = new CsvZebraTupleOutput();
+ } else {
+ instance.reset();
+ }
+
+ return instance;
+ }
+
+ @Override
+ public String toString() {
+ if (sb != null) {
+ return sb.toString();
+ }
+ return null;
+ }
+
+ void writeByte(byte b) {
+ writeLong((long) b);
+ }
+
+ void writeBool(boolean b) {
+ printCommaUnlessFirst();
+ String val = b ? "T" : "F";
+ sb.append(val);
+ }
+
+ void writeInt(int i) {
+ writeLong((long) i);
+ }
+
+ void writeLong(long l) {
+ printCommaUnlessFirst();
+ sb.append(l);
+ }
+
+ void writeFloat(float f) {
+ writeDouble((double) f);
+ }
+
+ void writeDouble(double d) {
+ printCommaUnlessFirst();
+ sb.append(d);
+ }
+
+ void writeString(String s) {
+ printCommaUnlessFirst();
+ sb.append(toCSVString(s));
+ }
+
+ void writeBuffer(DataByteArray buf) {
+ printCommaUnlessFirst();
+ sb.append(toCSVBuffer(buf));
+ }
+
+ void writeNull() {
+ printCommaUnlessFirst();
+ }
+
+ void startTuple(Tuple r) {
+ }
+
+ void endTuple(Tuple r) {
+ sb.append("\n");
+ isFirst = true;
+ }
+
+ /**
+ * Generate CSV-format string representations of Zebra tuples for Zebra
+ * streaming use.
+ *
+ * @param tuple
+ * @return CSV format string representation of Tuple
+ */
+ @SuppressWarnings("unchecked")
+ void writeTuple(Tuple r) {
+ for (int i = 0; i < r.size(); i++) {
+ try {
+ Object d = r.get(i);
+
+ if (d != null) {
+ if (d instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>) d;
+ startMap(map);
+ writeMap(map);
+ endMap(map);
+ } else if (d instanceof Tuple) {
+ Tuple t = (Tuple) d;
+ writeTuple(t);
+ } else if (d instanceof DataBag) {
+ DataBag bag = (DataBag) d;
+ writeBag(bag);
+ } else if (d instanceof Boolean) {
+ writeBool((Boolean) d);
+ } else if (d instanceof Byte) {
+ writeByte((Byte) d);
+ } else if (d instanceof Integer) {
+ writeInt((Integer) d);
+ } else if (d instanceof Long) {
+ writeLong((Long) d);
+ } else if (d instanceof Float) {
+ writeFloat((Float) d);
+ } else if (d instanceof Double) {
+ writeDouble((Double) d);
+ } else if (d instanceof String) {
+ writeString((String) d);
+ } else if (d instanceof DataByteArray) {
+ writeBuffer((DataByteArray) d);
+ } else {
+ throw new ExecException("Unknown data type");
+ }
+ } else { // if d is null, write nothing except ','
+ writeNull();
+ }
+ } catch (ExecException e) {
+ e.printStackTrace();
+ LOG.warn("Exception when CSV format Zebra tuple", e);
+ }
+ }
+ }
+
+ void startBag(DataBag bag) {
+ printCommaUnlessFirst();
+ sb.append("v{");
+ isFirst = true;
+ }
+
+ void writeBag(DataBag bag) {
+ Iterator<Tuple> iter = bag.iterator();
+ while (iter.hasNext()) {
+ Tuple t = (Tuple) iter.next();
+ startTuple(t);
+ writeTuple(t);
+ endTuple(t);
+ }
+ }
+
+ void endBag(DataBag bag) {
+ sb.append("}");
+ isFirst = false;
+ }
+
+ void startMap(Map<String, Object> m) {
+ printCommaUnlessFirst();
+ sb.append("m{");
+ isFirst = true;
+ }
+
+ void endMap(Map<String, Object> m) {
+ sb.append("}");
+ isFirst = false;
+ }
+
+ @SuppressWarnings("unchecked")
+ void writeMap(Map<String, Object> m) throws ExecException {
+ for (Map.Entry<String, Object> e : m.entrySet()) {
+ writeString(e.getKey());
+
+ Object d = e.getValue();
+ if (d != null) {
+ if (d instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>) d;
+ startMap(map);
+ writeMap(map);
+ endMap(map);
+ } else if (d instanceof Tuple) {
+ Tuple t = (Tuple) d;
+ writeTuple(t);
+ } else if (d instanceof DataBag) {
+ DataBag bag = (DataBag) d;
+ writeBag(bag);
+ } else if (d instanceof Boolean) {
+ writeBool((Boolean) d);
+ } else if (d instanceof Byte) {
+ writeByte((Byte) d);
+ } else if (d instanceof Integer) {
+ writeInt((Integer) d);
+ } else if (d instanceof Long) {
+ writeLong((Long) d);
+ } else if (d instanceof Float) {
+ writeFloat((Float) d);
+ } else if (d instanceof Double) {
+ writeDouble((Double) d);
+ } else if (d instanceof String) {
+ writeString((String) d);
+ } else if (d instanceof DataByteArray) {
+ writeBuffer((DataByteArray) d);
+ } else {
+ throw new ExecException("Unknown data type");
+ }
+ } else { // if d is null, write nothing except ','
+ writeNull();
+ }
+ }
+ }
+}
\ No newline at end of file
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
Mon Dec 7 20:44:57 2009
@@ -23,8 +23,6 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.hadoop.zebra.schema.Schema;
@@ -34,7 +32,8 @@
* Utility methods manipulating Table types (specifically, Tuple objects).
*/
public class TypesUtils {
- static TupleFactory tf = DefaultTupleFactory.getInstance();
+ //static TupleFactory tf = ZebraTupleFactory.getInstance();
+ static TupleFactory tf = ZebraTupleFactory.getZebraTupleFactoryInstance();
/**
* Create a tuple based on a schema
@@ -72,9 +71,8 @@
return new DefaultDataBag();
}
- //TODO - sync up with yan about this change
public static DataBag createBag(Schema schema) {
- return new DefaultDataBag();
+ return new DefaultDataBag();
}
/**
@@ -118,7 +116,7 @@
*/
public static class TupleReader {
private Tuple tuple;
- @SuppressWarnings("unused")
+ //@SuppressWarnings("unused")
private Schema physical;
private Projection projection;
SubColumnExtraction.SubColumn subcolextractor = null;
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTuple.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTuple.java?rev=888127&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTuple.java
(added)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTuple.java
Mon Dec 7 20:44:57 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.impl.util.TupleFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Zebra's implementation of Pig's Tuple.
+ * It's derived from Pig's DefaultTuple implementation.
+ *
+ */
+public class ZebraTuple extends DefaultTuple {
+ protected static final Log LOG = LogFactory.getLog(TupleFormat.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Default constructor. This constructor is public so that hadoop can call
+ * it directly. However, inside pig you should never be calling this
+ * function. Use TupleFactory instead.
+ */
+ public ZebraTuple() {
+ super();
+ }
+
+ /**
+ * Construct a tuple with a known number of fields. Package level so
+ * that callers cannot directly invoke it.
+ * @param size Number of fields to allocate in the tuple.
+ */
+ ZebraTuple(int size) {
+ mFields = new ArrayList<Object>(size);
+ for (int i = 0; i < size; i++) mFields.add(null);
+ }
+
+ /**
+ * Construct a tuple from an existing list of objects. Package
+ * level so that callers cannot directly invoke it.
+ * @param c List of objects to turn into a tuple.
+ */
+ ZebraTuple(List<Object> c) {
+ mFields = new ArrayList<Object>(c.size());
+
+ Iterator<Object> i = c.iterator();
+ int field;
+ for (field = 0; i.hasNext(); field++) mFields.add(field, i.next());
+ }
+
+ /**
+ * Construct a tuple from an existing list of objects. Package
+ * level so that callers cannot directly invoke it.
+ * @param c List of objects to turn into a tuple. This list will be kept
+ * as part of the tuple.
+ * @param junk Just used to differentiate from the constructor above that
+ * copies the list.
+ */
+ ZebraTuple(List<Object> c, int junk) {
+ mFields = c;
+ }
+
+ @Override
+ public String toString() {
+ CsvZebraTupleOutput csvZebraTupleOutput =
CsvZebraTupleOutput.createCsvZebraTupleOutput();
+ csvZebraTupleOutput.writeTuple(this);
+
+ return csvZebraTupleOutput.toString();
+ }
+}
\ No newline at end of file
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTupleFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTupleFactory.java?rev=888127&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTupleFactory.java
(added)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraTupleFactory.java
Mon Dec 7 20:44:57 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.zebra.types;
+
+import java.util.List;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * Factory that produces ZebraTuples;
+ */
+class ZebraTupleFactory extends TupleFactory {
+ private static ZebraTupleFactory self = null;
+
+ public Tuple newTuple() {
+ return new ZebraTuple();
+ }
+
+ public Tuple newTuple(int size) {
+ return new ZebraTuple(size);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple newTuple(List c) {
+ return new ZebraTuple(c);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple newTupleNoCopy(List list) {
+ return new ZebraTuple(list, 1);
+ }
+
+ public Tuple newTuple(Object datum) {
+ Tuple t = new ZebraTuple(1);
+ try {
+ t.set(0, datum);
+ } catch (ExecException e) {
+ // The world has come to an end, we just allocated a tuple with one
slot
+ // but we can't write to that slot.
+ throw new RuntimeException("Unable to write to field 0 in newly " +
+ "allocated tuple of size 1!", e);
+ }
+ return t;
+ }
+
+ public Class tupleClass() {
+ return ZebraTuple.class;
+ }
+
+ ZebraTupleFactory() {
+ }
+
+ public static ZebraTupleFactory getZebraTupleFactoryInstance() {
+ if (self == null) {
+ self = new ZebraTupleFactory();
+ }
+ return self;
+ }
+}
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
Mon Dec 7 20:44:57 2009
@@ -199,7 +199,7 @@
}
@Test
- public void testRedord() throws IOException, ParseException {
+ public void testRecord() throws IOException, ParseException {
BasicTable.drop(path, conf);
String STR_SCHEMA = "r1:record(f1:int, f2:long),
r2:record(r3:record(f3:float, f4))";
String STR_STORAGE = "[r1.f1]; [r2.r3.f4]; [r1.f2, r2.r3.f3]";
@@ -369,8 +369,7 @@
Assert.assertEquals(1, ((Tuple) RowValue.get(0)).get(0));
Assert.assertEquals(1001L, ((Tuple) RowValue.get(0)).get(1));
- Assert.assertEquals("((1.3,r3 row1 byte array))", RowValue.get(1)
- .toString());
+ Assert.assertEquals("1.3,#r3 row1 byte array", RowValue.get(1).toString());
Assert.assertEquals(1.3, ((Tuple) ((Tuple)
RowValue.get(1)).get(0)).get(0));
Assert.assertEquals("r3 row1 byte array",
((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
@@ -382,8 +381,7 @@
scanner.getValue(RowValue);
Assert.assertEquals(2, ((Tuple) RowValue.get(0)).get(0));
Assert.assertEquals(1002L, ((Tuple) RowValue.get(0)).get(1));
- Assert.assertEquals("((2.3,r3 row2 byte array))", RowValue.get(1)
- .toString());
+ Assert.assertEquals("2.3,#r3 row2 byte array", RowValue.get(1).toString());
Assert.assertEquals(2.3, ((Tuple) ((Tuple)
RowValue.get(1)).get(0)).get(0));
Assert.assertEquals("r3 row2 byte array",
((Tuple) ((Tuple) RowValue.get(1)).get(0)).get(1).toString());
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
Mon Dec 7 20:44:57 2009
@@ -56,12 +56,12 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.junit.AfterClass;
@@ -630,11 +630,11 @@
// This key has to be created by user
/*
- * Tuple userKey = new DefaultTuple(); userKey.append(new String(word));
+ * Tuple userKey = new ZebraTuple(); userKey.append(new String(word));
* userKey.append(Integer.parseInt(wdct[1]));
*/
System.out.println("in map, sortkey: " + sortKey);
- Tuple userKey = new DefaultTuple();
+ Tuple userKey = new ZebraTuple();
if (sortKey.equalsIgnoreCase("word,count")) {
userKey.append(new String(word));
userKey.append(Integer.parseInt(wdct[1]));
@@ -749,7 +749,7 @@
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(TestMultipleOutputs.MapClass.class);
jobConf.setMapOutputKeyClass(BytesWritable.class);
- jobConf.setMapOutputValueClass(DefaultTuple.class);
+ jobConf.setMapOutputValueClass(ZebraTuple.class);
FileInputFormat.setInputPaths(jobConf, inputPath);
jobConf.setNumMapTasks(1);
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
Mon Dec 7 20:44:57 2009
@@ -56,12 +56,12 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.junit.BeforeClass;
@@ -627,7 +627,7 @@
* userKey.append(Integer.parseInt(wdct[1]));
*/
System.out.println("in map, sortkey: " + sortKey);
- Tuple userKey = new DefaultTuple();
+ Tuple userKey = new ZebraTuple();
if (sortKey.equalsIgnoreCase("word,count")) {
userKey.append(new String(word));
userKey.append(Integer.parseInt(wdct[1]));
@@ -749,7 +749,7 @@
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(TestMultipleOutputs2.MapClass.class);
jobConf.setMapOutputKeyClass(BytesWritable.class);
- jobConf.setMapOutputValueClass(DefaultTuple.class);
+ jobConf.setMapOutputValueClass(ZebraTuple.class);
FileInputFormat.setInputPaths(jobConf, inputPath);
jobConf.setNumMapTasks(1);
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
Mon Dec 7 20:44:57 2009
@@ -56,12 +56,12 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.junit.BeforeClass;
@@ -528,7 +528,7 @@
* userKey.append(Integer.parseInt(wdct[1]));
*/
System.out.println("in map, sortkey: " + sortKey);
- Tuple userKey = new DefaultTuple();
+ Tuple userKey = new ZebraTuple();
if (sortKey.equalsIgnoreCase("word,count")) {
userKey.append(new String(word));
userKey.append(Integer.parseInt(wdct[1]));
@@ -630,7 +630,7 @@
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(TestMultipleOutputs3.MapClass.class);
jobConf.setMapOutputKeyClass(BytesWritable.class);
- jobConf.setMapOutputValueClass(DefaultTuple.class);
+ jobConf.setMapOutputValueClass(ZebraTuple.class);
FileInputFormat.setInputPaths(jobConf, inputPath);
jobConf.setNumMapTasks(1);
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
Mon Dec 7 20:44:57 2009
@@ -56,12 +56,12 @@
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.junit.BeforeClass;
@@ -558,7 +558,7 @@
* userKey.append(Integer.parseInt(wdct[1]));
*/
System.out.println("in map, sortkey: " + sortKey);
- Tuple userKey = new DefaultTuple();
+ Tuple userKey = new ZebraTuple();
if (sortKey.equalsIgnoreCase("word,count")) {
userKey.append(new String(word));
userKey.append(Integer.parseInt(wdct[1]));
@@ -657,7 +657,7 @@
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(TestMultipleOutputs4.MapClass.class);
jobConf.setMapOutputKeyClass(BytesWritable.class);
- jobConf.setMapOutputValueClass(DefaultTuple.class);
+ jobConf.setMapOutputValueClass(ZebraTuple.class);
FileInputFormat.setInputPaths(jobConf, inputPath);
jobConf.setNumMapTasks(1);
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestCheckin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestCheckin.java?rev=888127&r1=888126&r2=888127&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestCheckin.java
(original)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestCheckin.java
Mon Dec 7 20:44:57 2009
@@ -34,7 +34,8 @@
TestStorageMisc2.class,
TestStorageMisc3.class,
TestStorageRecord.class,
- TestStorePrimitive.class
+ TestStorePrimitive.class,
+ TestZebraTupleTostring.class
})
public class TestCheckin {
Added:
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestZebraTupleTostring.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestZebraTupleTostring.java?rev=888127&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestZebraTupleTostring.java
(added)
+++
hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestZebraTupleTostring.java
Mon Dec 7 20:44:57 2009
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+/**
+ *
+ * Test toString() methold of ZebraTuple.
+ * It should generate string representations of Zebra tuples of CSV format.
+ *
+ */
+public class TestZebraTupleTostring {
+ @Test
+ public void testSimple() throws IOException, ParseException {
+ String STR_SCHEMA = "s1:bool, s2:int, s3:long, s4:float, s5:string,
s6:bytes";
+ Schema schema = new Schema(STR_SCHEMA);
+ Tuple tuple = TypesUtils.createTuple(schema);
+ Assert.assertTrue(tuple instanceof ZebraTuple);
+ TypesUtils.resetTuple(tuple);
+
+ tuple.set(0, true); // bool
+ tuple.set(1, 1); // int
+ tuple.set(2, 1001L); // long
+ tuple.set(3, 1.1); // float
+ tuple.set(4, "hello\r world, 1\n"); // string
+ tuple.set(5, new DataByteArray("hello byte, 1")); // bytes
+
+ String str = tuple.toString();
+ Assert.assertTrue(str.equals("T,1,1001,1.1,'hello%0D world%2C 1%0A,#hello
byte, 1"));
+ }
+
+ @Test
+ public void testRecord() throws IOException, ParseException {
+ String STR_SCHEMA = "r1:record(f1:int, f2:long),
r2:record(r3:record(f3:float, f4))";
+ Schema schema = new Schema(STR_SCHEMA);
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TypesUtils.resetTuple(tuple);
+
+ Tuple tupRecord1;
+ Schema r1Schema = new Schema("f1:int, f2:long");
+ tupRecord1 = TypesUtils.createTuple(r1Schema);
+ TypesUtils.resetTuple(tupRecord1);
+
+ Tuple tupRecord2;
+ Schema r2Schema = new Schema("r3:record(f3:float, f4)");
+ tupRecord2 = TypesUtils.createTuple(r2Schema);
+ TypesUtils.resetTuple(tupRecord2);
+
+ Tuple tupRecord3;
+ Schema r3Schema = new Schema("f3:float, f4");
+ tupRecord3 = TypesUtils.createTuple(r3Schema);
+ TypesUtils.resetTuple(tupRecord3);
+
+ // r1:record(f1:int, f2:long)
+ tupRecord1.set(0, 1);
+ tupRecord1.set(1, 1001L);
+ Assert.assertTrue(tupRecord1.toString().equals("1,1001"));
+ tuple.set(0, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 1.3);
+ tupRecord3.set(1, new DataByteArray("r3 row1 byte array"));
+ Assert.assertTrue(tupRecord3.toString().equals("1.3,#r3 row1 byte array"));
+ Assert.assertTrue(tupRecord2.toString().equals("1.3,#r3 row1 byte array"));
+ tuple.set(1, tupRecord2);
+ Assert.assertTrue(tuple.toString().equals("1,1001,1.3,#r3 row1 byte
array"));
+ }
+
+ @Test
+ public void testMap() throws IOException, ParseException {
+ String STR_SCHEMA = "m1:map(string),m2:map(map(int))";
+ Schema schema = new Schema(STR_SCHEMA);
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TypesUtils.resetTuple(tuple);
+
+ // m1:map(string)
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a", "'A");
+ m1.put("b", "B,B");
+ m1.put("c", "C\n");
+ tuple.set(0, m1);
+
+ // m2:map(map(int))
+ HashMap<String, Map<String, Integer>> m2 = new HashMap<String, Map<String,
Integer>>();
+ Map<String, Integer> m3 = new HashMap<String, Integer>();
+ m3.put("m311", 311);
+ m3.put("m321", 321);
+ m3.put("m331", 331);
+ Map<String, Integer> m4 = new HashMap<String, Integer>();
+ m4.put("m411", 411);
+ m4.put("m421", 421);
+ m4.put("m431", 431);
+ m2.put("x", m3);
+ m2.put("y", m4);
+ tuple.set(1, m2);
+
+ Assert.assertTrue(tuple.toString().equals(
+
"m{'b,'B%2CB,'c,'C%0A,'a,''A},m{'y,m{'m421,421,'m411,411,'m431,431},'x,m{'m311,311,'m321,321,'m331,331}}"));
+ }
+
+ @Test
+ public void testCollection() throws IOException, ParseException {
+ String STR_SCHEMA =
+ "c1:collection(a:double, b:float,
c:bytes),c2:collection(r1:record(f1:int, f2:string),
d:string),c3:collection(c3_1:collection(e:int,f:bool))";
+ Schema schema = new Schema(STR_SCHEMA);
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TypesUtils.resetTuple(tuple);
+
+ DataBag bag1 = TypesUtils.createBag();
+ Schema schColl = new Schema("a:double, b:float, c:bytes");
+ Tuple tupColl1 = TypesUtils.createTuple(schColl);
+ Tuple tupColl2 = TypesUtils.createTuple(schColl);
+
+ DataBag bag2 = TypesUtils.createBag();
+ Schema schColl2 = new Schema("r1:record(f1:int, f2:string), d:string");
+ Tuple tupColl2_1 = TypesUtils.createTuple(schColl2);
+ Tuple tupColl2_2 = TypesUtils.createTuple(schColl2);
+
+ Tuple collRecord1;
+ try {
+ collRecord1 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple collRecord2;
+ try {
+ collRecord2 = TypesUtils.createTuple(new Schema("f1:int, f2:string"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ // c3:collection(c3_1:collection(e:int,f:bool))
+ DataBag bag3 = TypesUtils.createBag();
+ DataBag bag3_1 = TypesUtils.createBag();
+ DataBag bag3_2 = TypesUtils.createBag();
+
+ Tuple tupColl3_1 = null;
+ try {
+ tupColl3_1 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple tupColl3_2;
+ try {
+ tupColl3_2 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ Tuple tupColl3_3 = null;
+ try {
+ tupColl3_3 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ Tuple tupColl3_4;
+ try {
+ tupColl3_4 = TypesUtils.createTuple(new Schema("e:int,f:bool"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+
+ byte[] abs1 = new byte[3];
+ byte[] abs2 = new byte[4];
+ tupColl1.set(0, 3.1415926);
+ tupColl1.set(1, 1.6);
+ abs1[0] = 'a';
+ abs1[1] = 'a';
+ abs1[2] = 'a';
+ tupColl1.set(2, new DataByteArray(abs1));
+ bag1.add(tupColl1);
+ tupColl2.set(0, 123.456789);
+ tupColl2.set(1, 100);
+ abs2[0] = 'b';
+ abs2[1] = 'c';
+ abs2[2] = 'd';
+ abs2[3] = 'e';
+ tupColl2.set(2, new DataByteArray(abs2));
+ bag1.add(tupColl2);
+ tuple.set(0, bag1);
+
+ collRecord1.set(0, 1);
+ collRecord1.set(1, "record1_string1");
+ tupColl2_1.set(0, collRecord1);
+ tupColl2_1.set(1, "hello1");
+ bag2.add(tupColl2_1);
+
+ collRecord2.set(0, 2);
+ collRecord2.set(1, "record2_string1");
+ tupColl2_2.set(0, collRecord2);
+ tupColl2_2.set(1, "hello2");
+ bag2.add(tupColl2_2);
+ tuple.set(1, bag2);
+
+ TypesUtils.resetTuple(tupColl3_1);
+ TypesUtils.resetTuple(tupColl3_2);
+ tupColl3_1.set(0, 1);
+ tupColl3_1.set(1, true);
+ tupColl3_2.set(0, 2);
+ tupColl3_2.set(1, false);
+ bag3_1.add(tupColl3_1);
+ bag3_1.add(tupColl3_2);
+ bag3.addAll(bag3_1);
+
+ tupColl3_3.set(0, 3);
+ tupColl3_3.set(1, true);
+ tupColl3_4.set(0, 4);
+ tupColl3_4.set(1, false);
+ bag3_2.add(tupColl3_3);
+ bag3_2.add(tupColl3_4);
+ bag3.addAll(bag3_2);
+ tuple.set(2, bag3);
+
+ Assert.assertTrue(tuple.toString().equals("3.1415926,1.6,#aaa\n" +
+ "123.456789,100,#bcde\n" +
+ "1,'record1_string1,'hello1\n"+
+ "2,'record2_string1,'hello2\n"+
+ "1,T\n"+
+ "2,F\n"+
+ "3,T\n"+
+ "4,F\n"));
+ }
+}
+