Author: pradeepkth
Date: Mon Dec 21 23:23:51 2009
New Revision: 893026
URL: http://svn.apache.org/viewvc?rev=893026&view=rev
Log:
added some missing files from commit for PIG-1141
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigStreaming.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigToStream.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamToPig.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/StorageUtil.java
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigStreaming.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigStreaming.java?rev=893026&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigStreaming.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigStreaming.java
Mon Dec 21 23:23:51 2009
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.StorageUtil;
+
+/**
+ * The default implementation of {...@link PigToStream} and {...@link
StreamToPig}
+ * interfaces. It converts tuples into <code>fieldDel</code> separated lines
+ * and <code>fieldDel</code> separated lines into tuples.
+ *
+ */
+public class PigStreaming implements PigToStream, StreamToPig {
+
+ private byte recordDel = '\n';
+
+ private byte fieldDel = '\t';
+
+ private ByteArrayOutputStream out;
+
+ /**
+ * The constructor that uses the default field delimiter.
+ */
+ public PigStreaming() {
+ out = new ByteArrayOutputStream();
+ }
+
+ /**
+ * The constructor that accepts a user-specified field
+ * delimiter.
+ *
+ * @param delimiter a <code>String</code> specifying the field
+ * delimiter.
+ */
+ public PigStreaming(String delimiter) {
+ this();
+ fieldDel = StorageUtil.parseFieldDel(delimiter);
+ }
+
+ @Override
+ public byte[] serialize(Tuple t) throws IOException {
+ out.reset();
+ int sz = t.size();
+ for (int i=0; i<sz; i++) {
+ Object field = t.get(i);
+
+ StorageUtil.putField(out,field);
+
+ if (i == sz - 1) {
+ // last field in tuple.
+ out.write(recordDel);
+ } else {
+ out.write(fieldDel);
+ }
+ }
+ return out.toByteArray();
+ }
+
+ @Override
+ public Tuple deserialize(byte[] bytes) throws IOException {
+ Text val = new Text(bytes);
+ return StorageUtil.textToTuple(val, fieldDel);
+ }
+
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return new Utf8StorageConverter();
+ }
+
+}
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigToStream.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigToStream.java?rev=893026&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigToStream.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/PigToStream.java
Mon Dec 21 23:23:51 2009
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface used for the custom mapping of a {...@link Tuple} to a byte
+ * array. The byte array is fed to the stdin of the streaming process.
+ *
+ * This interface, together with {...@link StreamToPig}, is designed to provide
+ * a common protocol for data exchange between Pig runtime and streaming
+ * executables.
+ *
+ * Typically, user implements this interface for a particular type of
+ * stream command and specifies the implementation class in the Pig DEFINE
+ * statement.
+ *
+ */
+public interface PigToStream {
+
+ /**
+ * Given a tuple, produce an array of bytes to be passed to the streaming
+ * executable.
+ */
+ public byte[] serialize(Tuple t) throws IOException;
+
+}
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamToPig.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamToPig.java?rev=893026&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamToPig.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamToPig.java
Mon Dec 21 23:23:51 2009
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.LoadCaster;
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface used for the custom mapping of a byte array, received from
+ * the stdout of the streaming process, to a {...@link Tuple}.
+ *
+ * This interface, together with {...@link PigToStream}, is designed to provide
+ * a common protocol for data exchange between Pig runtime and streaming
+ * executables.
+ *
+ * The method <code>getLoadCaster</code> is called by Pig to convert the
+ * fields in the byte array to typed fields of the Tuple based on a given
+ * schema.
+ *
+ * Typically, user implements this interface for a particular type of
+ * stream command and specifies the implementation class in the Pig DEFINE
+ * statement.
+ */
+public interface StreamToPig {
+ /**
+ * Given a byte array from a streaming executable, produce a tuple.
+ */
+ public Tuple deserialize(byte[] bytes) throws IOException;
+
+ /**
+ * This will be called on the front end during planning and not on the
back
+ * end during execution.
+ *
+ * @return the {...@link LoadCaster} associated with this object.
+ * @throws IOException if there is an exception during LoadCaster
+ */
+ public LoadCaster getLoadCaster() throws IOException;
+
+}
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/StorageUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/StorageUtil.java?rev=893026&view=auto
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/StorageUtil.java
(added)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/StorageUtil.java
Mon Dec 21 23:23:51 2009
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.streaming.PigStreaming;
+
+/**
+ * This util class provides methods that are shared by storage class
+ * {...@link PigStorage} and streaming class {...@link PigStreaming}
+ *
+ */
+public final class StorageUtil {
+
+ private static final String UTF8 = "UTF-8";
+
+ /**
+ * Transform a <code>String</code> into a byte representing the
+ * field delimiter.
+ *
+ * @param delimiter a string that may be in single-quoted form
+ * @return the field delimiter in byte form
+ */
+ public static byte parseFieldDel(String delimiter) {
+ if (delimiter == null) {
+ throw new IllegalArgumentException("Null delimiter");
+ }
+
+ delimiter = parseSingleQuotedString(delimiter);
+
+ if (delimiter.length() > 1 && delimiter.charAt(0) != '\\') {
+ throw new IllegalArgumentException("Delimeter must be a " +
+ "single character " + delimiter);
+ }
+
+ byte fieldDel = '\t';
+
+ if (delimiter.length() == 1) {
+ fieldDel = (byte)delimiter.charAt(0);
+ } else if (delimiter.charAt(0) == '\\') {
+ switch (delimiter.charAt(1)) {
+ case 't':
+ fieldDel = (byte)'\t';
+ break;
+
+ case 'x':
+ fieldDel =
+ Integer.valueOf(delimiter.substring(2), 16).byteValue();
+ break;
+ case 'u':
+ fieldDel =
+ Integer.valueOf(delimiter.substring(2)).byteValue();
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown delimiter " +
+ delimiter);
+ }
+ }
+
+ return fieldDel;
+ }
+
+ /**
+ * Serialize an object to an {...@link OutputStream} in the
+ * field-delimited form.
+ *
+ * @param out an OutputStream object
+ * @param field an object to be serialized
+ * @throws IOException if serialization fails.
+ */
+ @SuppressWarnings("unchecked")
+ public static void putField(OutputStream out, Object field)
+ throws IOException {
+ //string constants for each delimiter
+ String tupleBeginDelim = "(";
+ String tupleEndDelim = ")";
+ String bagBeginDelim = "{";
+ String bagEndDelim = "}";
+ String mapBeginDelim = "[";
+ String mapEndDelim = "]";
+ String fieldDelim = ",";
+ String mapKeyValueDelim = "#";
+
+ switch (DataType.findType(field)) {
+ case DataType.NULL:
+ break; // just leave it empty
+
+ case DataType.BOOLEAN:
+ out.write(((Boolean)field).toString().getBytes());
+ break;
+
+ case DataType.INTEGER:
+ out.write(((Integer)field).toString().getBytes());
+ break;
+
+ case DataType.LONG:
+ out.write(((Long)field).toString().getBytes());
+ break;
+
+ case DataType.FLOAT:
+ out.write(((Float)field).toString().getBytes());
+ break;
+
+ case DataType.DOUBLE:
+ out.write(((Double)field).toString().getBytes());
+ break;
+
+ case DataType.BYTEARRAY:
+ byte[] b = ((DataByteArray)field).get();
+ out.write(b, 0, b.length);
+ break;
+
+ case DataType.CHARARRAY:
+ // oddly enough, writeBytes writes a string
+ out.write(((String)field).getBytes(UTF8));
+ break;
+
+ case DataType.MAP:
+ boolean mapHasNext = false;
+ Map<String, Object> m = (Map<String, Object>)field;
+ out.write(mapBeginDelim.getBytes(UTF8));
+ for(Map.Entry<String, Object> e: m.entrySet()) {
+ if(mapHasNext) {
+ out.write(fieldDelim.getBytes(UTF8));
+ } else {
+ mapHasNext = true;
+ }
+ putField(out, e.getKey());
+ out.write(mapKeyValueDelim.getBytes(UTF8));
+ putField(out, e.getValue());
+ }
+ out.write(mapEndDelim.getBytes(UTF8));
+ break;
+
+ case DataType.TUPLE:
+ boolean tupleHasNext = false;
+ Tuple t = (Tuple)field;
+ out.write(tupleBeginDelim.getBytes(UTF8));
+ for(int i = 0; i < t.size(); ++i) {
+ if(tupleHasNext) {
+ out.write(fieldDelim.getBytes(UTF8));
+ } else {
+ tupleHasNext = true;
+ }
+ try {
+ putField(out, t.get(i));
+ } catch (ExecException ee) {
+ throw ee;
+ }
+ }
+ out.write(tupleEndDelim.getBytes(UTF8));
+ break;
+
+ case DataType.BAG:
+ boolean bagHasNext = false;
+ out.write(bagBeginDelim.getBytes(UTF8));
+ Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+ while(tupleIter.hasNext()) {
+ if(bagHasNext) {
+ out.write(fieldDelim.getBytes(UTF8));
+ } else {
+ bagHasNext = true;
+ }
+ putField(out, (Object)tupleIter.next());
+ }
+ out.write(bagEndDelim.getBytes(UTF8));
+ break;
+
+ default: {
+ int errCode = 2108;
+ String msg = "Could not determine data type of field: " + field;
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ }
+ }
+
+ /**
+ * Transform a line of <code>Text</code> to a <code>Tuple</code>
+ *
+ * @param val a line of text
+ * @param fieldDel the field delimiter
+ * @return tuple constructed from the text
+ */
+ public static Tuple textToTuple(Text val, byte fieldDel) {
+
+ byte[] buf = val.getBytes();
+ int len = val.getLength();
+ int start = 0;
+
+ ArrayList<Object> protoTuple = new ArrayList<Object>();
+
+ for (int i = 0; i < len; i++) {
+ if (buf[i] == fieldDel) {
+ readField(protoTuple, buf, start, i);
+ start = i + 1;
+ }
+ }
+
+ // pick up the last field
+ if (start <= len) {
+ readField(protoTuple, buf, start, len);
+ }
+
+ return TupleFactory.getInstance().newTupleNoCopy(protoTuple);
+ }
+
+ //---------------------------------------------------------------
+ // private methods
+
+ private static void readField(ArrayList<Object> protoTuple,
+ byte[] buf, int start, int end) {
+ if (start == end) {
+ // NULL value
+ protoTuple.add(null);
+ } else {
+ protoTuple.add(new DataByteArray(buf, start, end));
+ }
+ }
+
+ private static String parseSingleQuotedString(String delimiter) {
+ int startIndex = 0;
+ int endIndex;
+ while (startIndex < delimiter.length()
+ && delimiter.charAt(startIndex++) != '\'')
+ ;
+ endIndex = startIndex;
+ while (endIndex < delimiter.length()
+ && delimiter.charAt(endIndex) != '\'') {
+ if (delimiter.charAt(endIndex) == '\\') {
+ endIndex++;
+ }
+ endIndex++;
+ }
+
+ return (endIndex < delimiter.length()) ?
+ delimiter.substring(startIndex, endIndex) : delimiter;
+ }
+}