Author: gates Date: Mon Feb 25 15:56:02 2008 New Revision: 631045 URL: http://svn.apache.org/viewvc?rev=631045&view=rev Log: Files that should have been added in previous checkins.
Added: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Added: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java?rev=631045&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java Mon Feb 25 15:56:02 2008 @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.IOException; +import java.lang.StringBuilder; +import java.util.ArrayList; +import java.util.Collection; + +/** + * An implementation of byte array. This is done as an object because we + * need to be able to implement compareTo, toString, hashCode, and some + * other methods. + */ +public class DataByteArray implements Comparable { + byte[] mData = null; + + /** + * Default constructor. The data array will not be allocated when this + * constructor is called. + */ + public DataByteArray() { + } + + /** + * Construct a byte array using the provided bytes as the content. + * @param b byte array to use as content. A reference to the bytes + * will be taken, the underlying bytes will not be copied. + */ + public DataByteArray(byte[] b) { + mData = b; + } + + /** + * Construct a byte array from a String. The contents of the string + * are copied. + * @param s String to make a byte array out of. + */ + public DataByteArray(String s) { + mData = s.getBytes(); + } + + /** + * Find the size of the byte array. + * @return number of bytes in the array. + */ + public int size() { + return mData.length; + } + + /** + * Get the underlying byte array. This is the real thing, not a copy, + * so don't mess with it! + * @return underlying byte[] + */ + public byte[] get() { + return mData; + } + + /** + * Set the internal byte array. This should not be called unless the + * default constructor was used. + * @param b byte array to store. The contents of the byte array are + * not copied. + */ + public void set(byte[] b) { + mData = b; + } + + /** + * Set the internal byte array. This should not be called unless the + * default constructor was used. + * @param s String to copy. The contents of the string are copied. + */ + public void set(String s) { + mData = s.getBytes(); + } + + @Override + public String toString() { + return new String(mData); + } + + public int compareTo(Object other) { + if (other instanceof DataByteArray) { + DataByteArray dba = (DataByteArray)other; + int mySz = mData.length; + int tSz = dba.mData.length; + if (tSz < mySz) { + return 1; + } else if (tSz > mySz) { + return -1; + } else { + for (int i = 0; i < mySz; i++) { + if (mData[i] < dba.mData[i]) return -1; + else if (mData[i] > dba.mData[i]) return 1; + } + return 0; + } + } else { + return DataType.compare(this, other); + } + } + + @Override + public boolean equals(Object other) { + return (compareTo(other) == 0); + } + + @Override + public int hashCode() { + int hash = 1; + for (int i = 0; i < mData.length; i++) { + // 29 chosen because hash uses 31 and bag 37, and a I want a + // prime. + hash = 29 * hash + mData[i]; + } + return hash; + } + +} Added: incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java?rev=631045&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java Mon Feb 25 15:56:02 2008 @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * A class to handle reading and writing of intermediate results of data + * types. This class could also be used for storing permanent results. + */ +public class DataReaderWriter { + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + private static BagFactory mBagFactory = BagFactory.getInstance(); + + public static Object readDatum(DataInput in) throws IOException { + // Read the data type + byte b = in.readByte(); + switch (b) { + case DataType.TUPLE: { + // Don't use Tuple.readFields, because it requires you to + // create a tuple with no size and then append fields. + // That's less efficient than allocating the tuple size up + // front and then filling in the spaces. + // Read the size. + int sz = in.readInt(); + if (sz < 1) { + throw new IOException("Invalid size " + sz + + " for a tuple"); + } + Tuple t = mTupleFactory.newTuple(sz); + for (int i = 0; i < sz; i++) { + t.set(i, readDatum(in)); + } + return t; + } + + case DataType.BAG: { + DataBag bag = mBagFactory.newDefaultBag(); + bag.readFields(in); + return bag; + } + + case DataType.MAP: { + int size = in.readInt(); + Map<Object, Object> m = new HashMap<Object, Object>(size); + for (int i = 0; i < size; i++) { + Object key = readDatum(in); + m.put(key, readDatum(in)); + } + return m; + } + + case DataType.INTEGER: + return new Integer(in.readInt()); + + case DataType.LONG: + return new Long(in.readLong()); + + case DataType.FLOAT: + return new Float(in.readFloat()); + + case DataType.DOUBLE: + return new Double(in.readDouble()); + + case DataType.BOOLEAN: + return new Boolean(in.readBoolean()); + + case DataType.BYTEARRAY: { + int size = in.readInt(); + byte[] ba = new byte[size]; + in.readFully(ba); + return new DataByteArray(ba); + } + + case DataType.CHARARRAY: { + int size = in.readInt(); + byte[] ba = new byte[size]; + in.readFully(ba); + return new String(ba); + } + + case DataType.NULL: + return null; + + default: + throw new RuntimeException("Unexpected data type " + b + + " found in stream."); + } + } + + public static void writeDatum( + DataOutput out, + Object val) throws IOException { + // Read the data type + byte type = DataType.findType(val); + switch (type) { + case DataType.TUPLE: + // Because tuples are written directly by hadoop, the + // tuple's write method needs to write the indicator byte. + // So don't write the indicator byte here as it is for + // everyone else. + ((Tuple)val).write(out); + break; + + case DataType.BAG: + out.writeByte(DataType.BAG); + ((DataBag)val).write(out); + break; + + case DataType.MAP: { + out.writeByte(DataType.MAP); + Map<Object, Object> m = (Map<Object, Object>)val; + out.writeInt(m.size()); + Iterator<Map.Entry<Object, Object> > i = + m.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> entry = i.next(); + writeDatum(out, entry.getKey()); + writeDatum(out, entry.getValue()); + } + break; + } + + case DataType.INTEGER: + out.writeByte(DataType.INTEGER); + out.writeInt((Integer)val); + break; + + case DataType.LONG: + out.writeByte(DataType.LONG); + out.writeLong((Long)val); + break; + + case DataType.FLOAT: + out.writeByte(DataType.FLOAT); + out.writeFloat((Float)val); + break; + + case DataType.DOUBLE: + out.writeByte(DataType.DOUBLE); + out.writeDouble((Double)val); + break; + + case DataType.BOOLEAN: + out.writeByte(DataType.BOOLEAN); + out.writeBoolean((Boolean)val); + break; + + case DataType.BYTEARRAY: { + out.writeByte(DataType.BYTEARRAY); + DataByteArray bytes = (DataByteArray)val; + out.writeInt(bytes.size()); + out.write(bytes.mData); + break; + } + + case DataType.CHARARRAY: { + out.writeByte(DataType.CHARARRAY); + String s = (String)val; + out.writeInt(s.length()); + out.writeBytes(s); + break; + } + + case DataType.NULL: + out.writeByte(DataType.NULL); + break; + + default: + throw new RuntimeException("Unexpected data type " + type + + " found in stream."); + } + } +} + Added: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=631045&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Mon Feb 25 15:56:02 2008 @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.StringBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.io.WritableComparable; + +/** + * A default implementation of Tuple. This class will be created by the + * DefaultTupleFactory. + */ +public class DefaultTuple implements Tuple { + protected List<Object> mFields; + + /** + * Default constructor. This constructor is public so that hadoop can call + * it directly. However, inside pig you should never be calling this + * function. Use TupleFactory instead. + */ + public DefaultTuple() { + mFields = new ArrayList<Object>(); + } + + /** + * Construct a tuple with a known number of fields. Package level so + * that callers cannot directly invoke it. + * @param size Number of fields to allocate in the tuple. + */ + DefaultTuple(int size) { + mFields = new ArrayList<Object>(size); + for (int i = 0; i < size; i++) mFields.add(null); + } + + /** + * Construct a tuple from an existing list of objects. Package + * level so that callers cannot directly invoke it. + * @param c List of objects to turn into a tuple. + */ + DefaultTuple(List<Object> c) { + mFields = new ArrayList<Object>(c.size()); + + Iterator<Object> i = c.iterator(); + int field; + for (field = 0; i.hasNext(); field++) mFields.add(field, i.next()); + } + + /** + * Make this tuple reference the contents of another. This method does not copy + * the underlying data. It maintains references to the data from the original + * tuple (and possibly even to the data structure holding the data). + * @param t Tuple to reference. + */ + public void reference(Tuple t) { + mFields = t.getAll(); + } + + /** + * Find the size of the tuple. Used to be called arity(). + * @return number of fields in the tuple. + */ + public int size() { + return mFields.size(); + } + + /** + * Find out if a given field is null. + * @param fieldNum Number of field to check for null. + * @return true if the field is null, false otherwise. + * @throws IOException if the field number given is greater + * than or equal to the number of fields in the tuple. + */ + public boolean isNull(int fieldNum) throws IOException { + checkBounds(fieldNum); + return (mFields.get(fieldNum) == null); + } + + /** + * Find the type of a given field. + * @param fieldNum Number of field to get the type for. + * @return type, encoded as a byte value. The values are taken from + * the class DataType. If the field is null, then DataType.UNKNOWN + * will be returned. + * @throws IOException if the field number is greater than or equal to + * the number of fields in the tuple. + */ + public byte getType(int fieldNum) throws IOException { + checkBounds(fieldNum); + return DataType.findType(mFields.get(fieldNum)); + } + + /** + * Get the value in a given field. + * @param fieldNum Number of the field to get the value for. + * @return value, as an Object. + * @throws IOException if the field number is greater than or equal to + * the number of fields in the tuple. + */ + public Object get(int fieldNum) throws IOException { + checkBounds(fieldNum); + return mFields.get(fieldNum); + } + + /** + * Get all of the fields in the tuple as a list. + * @return List<Object> containing the fields of the tuple + * in order. + */ + public List<Object> getAll() { + return mFields; + } + + /** + * Set the value in a given field. + * @param fieldNum Number of the field to set the value for. + * @param val Object to put in the indicated field. + * @throws IOException if the field number is greater than or equal to + * the number of fields in the tuple. + */ + public void set(int fieldNum, Object val) throws IOException { + checkBounds(fieldNum); + mFields.set(fieldNum, val); + } + + /** + * Append a field to a tuple. This method is not efficient as it may + * force copying of existing data in order to grow the data structure. + * Whenever possible you should construct your Tuple with the + * newTuple(int) method and then fill in the values with set(), rather + * than construct it with newTuple() and append values. + * @param val Object to append to the tuple. + */ + public void append(Object val) { + mFields.add(val); + } + + /** + * Determine the size of tuple in memory. This is used by data bags + * to determine their memory size. This need not be exact, but it + * should be a decent estimation. + * @return estimated memory size. + */ + public long getMemorySize() { + Iterator<Object> i = mFields.iterator(); + long sum = 0; + while (i.hasNext()) { + sum += getFieldMemorySize(i.next()); + } + return sum; + } + + /** + * Write a tuple of atomic values into a string. All values in the + * tuple must be atomic (no bags, tuples, or maps). + * @param delim Delimiter to use in the string. + * @return A string containing the tuple. + * @throws IOException if a non-atomic value is found. + */ + public String toDelimitedString(String delim) throws IOException { + StringBuilder buf = new StringBuilder(); + for (Iterator<Object> it = mFields.iterator(); it.hasNext();) { + Object field = it.next(); + if (DataType.isComplex(field)) { + throw new IOException("Unable to convert non-flat tuple to string."); + } + buf.append(field.toString()); + if (it.hasNext()) + buf.append(delim); + } + return buf.toString(); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append('('); + for (Iterator<Object> it = mFields.iterator(); it.hasNext();) { + Object d = it.next(); + if(d != null) { + sb.append(d.toString()); + } else { + sb.append("NULL"); + } + if (it.hasNext()) + sb.append(", "); + } + sb.append(')'); + return sb.toString(); + } + + public int compareTo(Object other) { + if (other instanceof Tuple) { + Tuple t = (Tuple)other; + int mySz = mFields.size(); + int tSz = t.size(); + if (tSz < mySz) { + return 1; + } else if (tSz > mySz) { + return -1; + } else { + for (int i = 0; i < mySz; i++) { + try { + int c = DataType.compare(mFields.get(i), t.get(i)); + if (c != 0) { + return c; + } + } catch (IOException e) { + throw new RuntimeException("Unable to compare tuples", e); + } + } + return 0; + } + } else { + return DataType.compare(this, other); + } + } + + @Override + public boolean equals(Object other) { + return (compareTo(other) == 0); + } + + @Override + public int hashCode() { + int hash = 1; + for (Iterator<Object> it = mFields.iterator(); it.hasNext();) { + Object o = it.next(); + if (o != null) { + hash = 31 * hash + o.hashCode(); + } + } + return hash; + } + + public void write(DataOutput out) throws IOException { + out.writeByte(DataType.TUPLE); + int sz = size(); + out.writeInt(sz); + for (int i = 0; i < sz; i++) { + Object d = get(i); + DataReaderWriter.writeDatum(out, mFields.get(i)); + } + } + + public void readFields(DataInput in) throws IOException { + // Clear our fields, in case we're being reused. + mFields.clear(); + + // Make sure it's a tuple. + byte b = in.readByte(); + if (b != DataType.TUPLE) { + throw new IOException("Unexpected data while reading tuple " + + "from binary file"); + } + + // Read the number of fields + int sz = in.readInt(); + for (int i = 0; i < sz; i++) { + append(DataReaderWriter.readDatum(in)); + } + } + + private long getFieldMemorySize(Object o) { + // 12 is added to each to account for the object overhead and the + // pointer in the tuple. + switch (DataType.findType(o)) { + case DataType.BYTEARRAY: { + byte[] bytes = (byte[])o; + return bytes.length + 12; + } + + case DataType.CHARARRAY: { + String s = (String)o; + return s.length() * 2 + 12; + } + + case DataType.TUPLE: { + Tuple t = (Tuple)o; + return t.getMemorySize() + 12; + } + + case DataType.BAG: { + DataBag b = (DataBag)o; + return b.getMemorySize() + 12; + } + + case DataType.INTEGER: + return 4 + 12; + + case DataType.LONG: + return 8 + 12; + + case DataType.MAP: { + Map<Object, Object> m = (Map<Object, Object>)o; + Iterator<Map.Entry<Object, Object> > i = + m.entrySet().iterator(); + long sum = 0; + while (i.hasNext()) { + Map.Entry<Object, Object> entry = i.next(); + sum += getFieldMemorySize(entry.getKey()); + sum += getFieldMemorySize(entry.getValue()); + } + return sum + 12; + } + + case DataType.FLOAT: + return 8 + 12; + + case DataType.DOUBLE: + return 16 + 12; + + case DataType.BOOLEAN: + return 4 + 12; + + default: + // ?? + return 12; + } + } + + private void checkBounds(int fieldNum) throws IOException { + if (fieldNum >= mFields.size()) { + throw new IOException("Request for field number " + fieldNum + + " exceeds tuple size of " + mFields.size()); + } + } +}