Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataByteArray.java Mon Aug 16 
07:57:44 2010
@@ -17,13 +17,8 @@
  */
 package org.apache.pig.data;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
-import java.lang.StringBuilder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.MappedByteBuffer;
 
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -191,33 +186,40 @@ public class DataByteArray implements Co
     }
 
     /**
-     * Compare two byte arrays.  Comparison is done first using byte values
-     * then length.  So "g" will be greater than "abcdefg", but "hello worlds"
-     * is greater than "hello world".  If the other object is not a
-     * DataByteArray, {...@link DataType#compare} will be called.
+     * Compare two byte arrays. Comparison is done first using byte values then
+     * length. So "g" will be greater than "abcdefg", but "hello worlds" is
+     * greater than "hello world". If the other object is not a DataByteArray,
+     * {...@link DataType#compare} will be called.
+     * 
      * @param other Other object to compare to.
      * @return -1 if less than, 1 if greater than, 0 if equal.
      */
     public int compareTo(Object other) {
         if (other instanceof DataByteArray) {
-            DataByteArray dba = (DataByteArray)other;
-            int mySz = mData.length;
-            int tSz = dba.mData.length;
-            int i;
-            for (i = 0; i < mySz; i++) {
-                // If the other has run out of characters, we're bigger.
-                if (i >= tSz) return 1;
-                if (mData[i] < dba.mData[i]) return -1;
-                else if (mData[i] > dba.mData[i]) return 1;
-            }
-            // If the other still has characters left, it's greater
-            if (i < tSz) return -1;
-            return 0;
+            DataByteArray dba = (DataByteArray) other;
+            return compare(mData, dba.mData);
         } else {
             return DataType.compare(this, other);
         }
     }
 
+    public static int compare(byte[] b1, byte[] b2) {
+        int i;
+        for (i = 0; i < b1.length; i++) {
+            // If the other has run out of characters, we're bigger.
+            if (i >= b2.length)
+                return 1;
+            if (b1[i] < b2[i])
+                return -1;
+            else if (b1[i] > b2[i])
+                return 1;
+        }
+        // If the other still has characters left, it's greater
+        if (i < b2.length)
+            return -1;
+        return 0;
+    }
+
     @Override
     public boolean equals(Object other) {
         return (compareTo(other) == 0);

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Mon Aug 16 
07:57:44 2010
@@ -46,7 +46,7 @@ public class DataReaderWriter {
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
     private static BagFactory mBagFactory = BagFactory.getInstance();
     static final int UNSIGNED_SHORT_MAX = 65535;
-    static final String UTF8 = "UTF-8";
+    public static final String UTF8 = "UTF-8";
 
     public static Tuple bytesToTuple(DataInput in) throws IOException {
         // Don't use Tuple.readFields, because it requires you to

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Mon Aug 16 
07:57:44 2010
@@ -17,86 +17,101 @@
  */
 package org.apache.pig.data;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.lang.StringBuilder;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.TupleFormat;
 
 /**
- * A default implementation of Tuple.  This class will be created by the
- * DefaultTupleFactory.
+ * A default implementation of Tuple. This class will be created by the 
DefaultTupleFactory.
  */
 public class DefaultTuple implements Tuple {
-    
+
     protected boolean isNull = false;
     private static final long serialVersionUID = 2L;
     protected List<Object> mFields;
-    
+
     /**
-     * Default constructor.  This constructor is public so that hadoop can call
-     * it directly.  However, inside pig you should never be calling this
-     * function.  Use TupleFactory instead.
+     * Default constructor. This constructor is public so that hadoop can call 
it directly. However, inside pig you
+     * should never be calling this function. Use TupleFactory instead.
      */
     public DefaultTuple() {
         mFields = new ArrayList<Object>();
     }
 
     /**
-     * Construct a tuple with a known number of fields.  Package level so
-     * that callers cannot directly invoke it.
-     * @param size Number of fields to allocate in the tuple.
+     * Construct a tuple with a known number of fields. Package level so that 
callers cannot directly invoke it.
+     * 
+     * @param size
+     *            Number of fields to allocate in the tuple.
      */
     DefaultTuple(int size) {
         mFields = new ArrayList<Object>(size);
-        for (int i = 0; i < size; i++) mFields.add(null);
+        for (int i = 0; i < size; i++)
+            mFields.add(null);
     }
 
     /**
-     * Construct a tuple from an existing list of objects.  Package
-     * level so that callers cannot directly invoke it.
-     * @param c List of objects to turn into a tuple.
+     * Construct a tuple from an existing list of objects. Package level so 
that callers cannot directly invoke it.
+     * 
+     * @param c
+     *            List of objects to turn into a tuple.
      */
     DefaultTuple(List<Object> c) {
         mFields = new ArrayList<Object>(c.size());
 
         Iterator<Object> i = c.iterator();
         int field;
-        for (field = 0; i.hasNext(); field++) mFields.add(field, i.next());
+        for (field = 0; i.hasNext(); field++)
+            mFields.add(field, i.next());
     }
 
     /**
-     * Construct a tuple from an existing list of objects.  Package
-     * level so that callers cannot directly invoke it.
-     * @param c List of objects to turn into a tuple.  This list will be kept
-     * as part of the tuple.
-     * @param junk Just used to differentiate from the constructor above that
-     * copies the list.
+     * Construct a tuple from an existing list of objects. Package level so 
that callers cannot directly invoke it.
+     * 
+     * @param c
+     *            List of objects to turn into a tuple. This list will be kept 
as part of the tuple.
+     * @param junk
+     *            Just used to differentiate from the constructor above that 
copies the list.
      */
     DefaultTuple(List<Object> c, int junk) {
         mFields = c;
     }
 
-
     /**
-     * Make this tuple reference the contents of another.  This method does 
not copy
-     * the underlying data.   It maintains references to the data from the 
original
-     * tuple (and possibly even to the data structure holding the data).
-     * @param t Tuple to reference.
+     * Make this tuple reference the contents of another. This method does not 
copy the underlying data. It maintains
+     * references to the data from the original tuple (and possibly even to 
the data structure holding the data).
+     * 
+     * @param t
+     *            Tuple to reference.
      */
     public void reference(Tuple t) {
         mFields = t.getAll();
     }
 
     /**
-     * Find the size of the tuple.  Used to be called arity().
+     * Find the size of the tuple. Used to be called arity().
+     * 
      * @return number of fields in the tuple.
      */
     public int size() {
@@ -105,10 +120,12 @@ public class DefaultTuple implements Tup
 
     /**
      * Find out if a given field is null.
-     * @param fieldNum Number of field to check for null.
+     * 
+     * @param fieldNum
+     *            Number of field to check for null.
      * @return true if the field is null, false otherwise.
-     * @throws ExecException if the field number given is greater
-     * than or equal to the number of fields in the tuple.
+     * @throws ExecException
+     *             if the field number given is greater than or equal to the 
number of fields in the tuple.
      */
     public boolean isNull(int fieldNum) throws ExecException {
         return (mFields.get(fieldNum) == null);
@@ -116,12 +133,13 @@ public class DefaultTuple implements Tup
 
     /**
      * Find the type of a given field.
-     * @param fieldNum Number of field to get the type for.
-     * @return type, encoded as a byte value.  The values are taken from
-     * the class DataType.  If the field is null, then DataType.UNKNOWN
-     * will be returned.
-     * @throws ExecException if the field number is greater than or equal to
-     * the number of fields in the tuple.
+     * 
+     * @param fieldNum
+     *            Number of field to get the type for.
+     * @return type, encoded as a byte value. The values are taken from the 
class DataType. If the field is null, then
+     *         DataType.UNKNOWN will be returned.
+     * @throws ExecException
+     *             if the field number is greater than or equal to the number 
of fields in the tuple.
      */
     public byte getType(int fieldNum) throws ExecException {
         return DataType.findType(mFields.get(fieldNum));
@@ -129,10 +147,12 @@ public class DefaultTuple implements Tup
 
     /**
      * Get the value in a given field.
-     * @param fieldNum Number of the field to get the value for.
+     * 
+     * @param fieldNum
+     *            Number of the field to get the value for.
      * @return value, as an Object.
-     * @throws ExecException if the field number is greater than or equal to
-     * the number of fields in the tuple.
+     * @throws ExecException
+     *             if the field number is greater than or equal to the number 
of fields in the tuple.
      */
     public Object get(int fieldNum) throws ExecException {
         return mFields.get(fieldNum);
@@ -140,8 +160,8 @@ public class DefaultTuple implements Tup
 
     /**
      * Get all of the fields in the tuple as a list.
-     * @return List&lt;Object&gt; containing the fields of the tuple
-     * in order.
+     * 
+     * @return List&lt;Object&gt; containing the fields of the tuple in order.
      */
     public List<Object> getAll() {
         return mFields;
@@ -149,49 +169,50 @@ public class DefaultTuple implements Tup
 
     /**
      * Set the value in a given field.
-     * @param fieldNum Number of the field to set the value for.
-     * @param val Object to put in the indicated field.
-     * @throws ExecException if the field number is greater than or equal to
-     * the number of fields in the tuple.
+     * 
+     * @param fieldNum
+     *            Number of the field to set the value for.
+     * @param val
+     *            Object to put in the indicated field.
+     * @throws ExecException
+     *             if the field number is greater than or equal to the number 
of fields in the tuple.
      */
     public void set(int fieldNum, Object val) throws ExecException {
         mFields.set(fieldNum, val);
     }
 
     /**
-     * Append a field to a tuple.  This method is not efficient as it may
-     * force copying of existing data in order to grow the data structure.
-     * Whenever possible you should construct your Tuple with the
-     * newTuple(int) method and then fill in the values with set(), rather
-     * than construct it with newTuple() and append values.
-     * @param val Object to append to the tuple.
+     * Append a field to a tuple. This method is not efficient as it may force 
copying of existing data in order to grow
+     * the data structure. Whenever possible you should construct your Tuple 
with the newTuple(int) method and then fill
+     * in the values with set(), rather than construct it with newTuple() and 
append values.
+     * 
+     * @param val
+     *            Object to append to the tuple.
      */
     public void append(Object val) {
         mFields.add(val);
     }
 
     /**
-     * Determine the size of tuple in memory.  This is used by data bags
-     * to determine their memory size.  This need not be exact, but it
-     * should be a decent estimation.
+     * Determine the size of tuple in memory. This is used by data bags to 
determine their memory size. This need not be
+     * exact, but it should be a decent estimation.
+     * 
      * @return estimated memory size.
      */
     public long getMemorySize() {
         Iterator<Object> i = mFields.iterator();
-        //fixed overhead
-        long empty_tuple_size = 8 /* tuple object header*/ 
+        // fixed overhead
+        long empty_tuple_size = 8 /* tuple object header */
         + 8 /* isNull - but rounded to 8 bytes as total obj size needs to be 
multiple of 8 */
-        + 8 /* mFields reference*/
-        + 32 /* mFields array list fixed size*/;
+        + 8 /* mFields reference */
+        + 32 /* mFields array list fixed size */;
 
-        
-        //rest of the fixed portion of mfields size is accounted within 
empty_tuple_size
-        long mfields_var_size =  roundToEight(4 + 4*mFields.size());
+        // rest of the fixed portion of mfields size is accounted within 
empty_tuple_size
+        long mfields_var_size = roundToEight(4 + 4 * mFields.size());
         // in java hotspot 32bit vm, there seems to be a minimum tuple size of 
96
         // which is probably from the minimum size of this array list
-        mfields_var_size = Math.max(40, mfields_var_size); 
-        
-        
+        mfields_var_size = Math.max(40, mfields_var_size);
+
         long sum = empty_tuple_size + mfields_var_size;
         while (i.hasNext()) {
             sum += getFieldMemorySize(i.next());
@@ -199,23 +220,24 @@ public class DefaultTuple implements Tup
         return sum;
     }
 
-    
-    
     /**
      * Memory size of objects are rounded to multiple of 8 bytes
+     * 
      * @param i
-     * @return i rounded to a equal of higher multiple of 8 
+     * @return i rounded to a equal of higher multiple of 8
      */
     private long roundToEight(long i) {
-        return 8 * ((i+7)/8); // integer division rounds the result down
+        return 8 * ((i + 7) / 8); // integer division rounds the result down
     }
 
-    /** 
-     * Write a tuple of atomic values into a string.  All values in the
-     * tuple must be atomic (no bags, tuples, or maps).
-     * @param delim Delimiter to use in the string.
+    /**
+     * Write a tuple of atomic values into a string. All values in the tuple 
must be atomic (no bags, tuples, or maps).
+     * 
+     * @param delim
+     *            Delimiter to use in the string.
      * @return A string containing the tuple.
-     * @throws ExecException if a non-atomic value is found.
+     * @throws ExecException
+     *             if a non-atomic value is found.
      */
     public String toDelimitedString(String delim) throws ExecException {
         StringBuilder buf = new StringBuilder();
@@ -228,15 +250,14 @@ public class DefaultTuple implements Tup
         return buf.toString();
     }
 
-
     @Override
     public String toString() {
-       return TupleFormat.format(this);
+        return TupleFormat.format(this);
     }
 
     public int compareTo(Object other) {
         if (other instanceof Tuple) {
-            Tuple t = (Tuple)other;
+            Tuple t = (Tuple) other;
             int mySz = mFields.size();
             int tSz = t.size();
             if (tSz < mySz) {
@@ -261,6 +282,240 @@ public class DefaultTuple implements Tup
         }
     }
 
+    public static class DefaultTupleRawComparator extends WritableComparator 
implements TupleRawComparator {
+        private final Log mLog = LogFactory.getLog(getClass());
+        private boolean[] mAsc;
+        private boolean mWholeTuple;
+        private boolean mHasNullField;
+        private TupleFactory mFact;
+
+        public DefaultTupleRawComparator() {
+            super(DefaultTuple.class);
+        }
+
+        @Override
+        public Configuration getConf() {
+            return null;
+        }
+
+        @Override
+        public void setConf(Configuration conf) {
+            if (!(conf instanceof JobConf)) {
+                mLog.warn("Expected jobconf in setConf, got " + 
conf.getClass().getName());
+                return;
+            }
+            JobConf jconf = (JobConf) conf;
+            try {
+                mAsc = (boolean[]) 
ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+            } catch (IOException ioe) {
+                mLog.error("Unable to deserialize pig.sortOrder " + 
ioe.getMessage());
+                throw new RuntimeException(ioe);
+            }
+            if (mAsc == null) {
+                mAsc = new boolean[1];
+                mAsc[0] = true;
+            }
+            // If there's only one entry in mAsc, it means it's for the whole
+            // tuple. So we can't be looking for each column.
+            mWholeTuple = (mAsc.length == 1);
+            mFact = TupleFactory.getInstance();
+        }
+
+        @Override
+        public boolean hasComparedTupleNull() {
+            return mHasNullField;
+        }
+
+        /**
+         * Compare two DefaultTuples as raw bytes. We assume the Tuples are 
NOT PigNullableWritable, so client classes
+         * need to deal with Null and Index.
+         */
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1);
+            ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2);
+            int rc = compareDefaultTuple(bb1, bb2, true); // FIXME adjust for 
secondary sort asc
+            return rc;
+        }
+
+        /**
+         * Compare two DefaultTuples as raw bytes.
+         */
+        private int compareDefaultTuple(ByteBuffer bb1, ByteBuffer bb2, 
boolean topLevel) {
+            mHasNullField = false;
+            // store the position in case of deserialization
+            int s1 = bb1.position();
+            int s2 = bb2.position();
+            int rc = 0;
+            byte tupleType1 = bb1.get();
+            byte tupleType2 = bb2.get();
+            assert (tupleType1 == tupleType2 && tupleType1 == DataType.TUPLE);
+            // first compare sizes
+            int sz1 = bb1.getInt();
+            int sz2 = bb2.getInt();
+            if (sz1 > sz2) {
+                return 1;
+            } else if (sz1 < sz2) {
+                return -1;
+            } else {
+                // if sizes are the same, compare field by field
+                for (int i = 0; i < sz1 && rc == 0; i++) {
+                    byte dt1 = bb1.get();
+                    byte dt2 = bb2.get();
+                    if (dt1 == dt2) {
+                        switch (dt1) {
+                        case DataType.NULL:
+                            if (topLevel) // we are scanning the top-level 
Tuple (original call)
+                                mHasNullField = true;
+                            rc = 0;
+                            break;
+                        case DataType.BOOLEAN:
+                        case DataType.BYTE:
+                            byte bv1 = bb1.get();
+                            byte bv2 = bb2.get();
+                            rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
+                            break;
+                        case DataType.INTEGER:
+                            int iv1 = bb1.getInt();
+                            int iv2 = bb2.getInt();
+                            rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
+                            break;
+                        case DataType.LONG:
+                            long lv1 = bb1.getLong();
+                            long lv2 = bb2.getLong();
+                            rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
+                            break;
+                        case DataType.FLOAT:
+                            float fv1 = bb1.getFloat();
+                            float fv2 = bb2.getFloat();
+                            rc = Float.compare(fv1, fv2);
+                            break;
+                        case DataType.DOUBLE:
+                            double dv1 = bb1.getDouble();
+                            double dv2 = bb2.getDouble();
+                            rc = Double.compare(dv1, dv2);
+                            break;
+                        case DataType.BYTEARRAY:
+                            int basz1 = bb1.getInt();
+                            int basz2 = bb2.getInt();
+                            byte[] ba1 = new byte[basz1];
+                            byte[] ba2 = new byte[basz2];
+                            bb1.get(ba1);
+                            bb2.get(ba2);
+                            rc = DataByteArray.compare(ba1, ba2);
+                            break;
+                        case DataType.CHARARRAY:
+                        case DataType.BIGCHARARRAY:
+                            int casz1 = (dt1 == DataType.CHARARRAY) ? 
bb1.getShort() : bb1.getInt();
+                            int casz2 = (dt1 == DataType.CHARARRAY) ? 
bb2.getShort() : bb2.getInt();
+                            byte[] ca1 = new byte[casz1];
+                            byte[] ca2 = new byte[casz2];
+                            bb1.get(ca1);
+                            bb2.get(ca2);
+                            String str1 = null,
+                            str2 = null;
+                            try {
+                                str1 = new String(ca1, DataReaderWriter.UTF8);
+                                str2 = new String(ca2, DataReaderWriter.UTF8);
+                            } catch (UnsupportedEncodingException uee) {
+                                mLog.warn("Unsupported string encoding", uee);
+                                uee.printStackTrace();
+                            }
+                            if (str1 != null && str2 != null)
+                                rc = str1.compareTo(str2);
+                            break;
+                        case DataType.TUPLE:
+                            // put back the cursor to before DataType.TUPLE
+                            bb1.position(bb1.position() - 1);
+                            bb2.position(bb2.position() - 1);
+                            rc = compareDefaultTuple(bb1, bb2, false);
+                            break;
+                        default:
+                            mLog.info("Unsupported DataType for binary 
comparison, switching to object deserialization: "
+                                    + DataType.genTypeToNameMap().get(dt1) + 
"(" + dt1 + ")");
+                            Tuple t1 = mFact.newTuple();
+                            Tuple t2 = mFact.newTuple();
+                            try {
+                                t1.readFields(new DataInputStream(
+                                        new ByteArrayInputStream(bb1.array(), 
s1, bb1.limit())));
+                                t2.readFields(new DataInputStream(
+                                        new ByteArrayInputStream(bb2.array(), 
s2, bb2.limit())));
+                            } catch (IOException ioe) {
+                                mLog.error("Unable to instantiate tuples for 
comparison: " + ioe.getMessage());
+                                throw new RuntimeException(ioe.getMessage(), 
ioe);
+                            }
+                            // delegate to compareTuple
+                            return compareTuple(t1, t2);
+                        }
+                    } else { // compare DataTypes
+                        if (dt1 < dt2)
+                            rc = -1;
+                        else
+                            rc = 1;
+                    }
+                    // flip if the order is descending
+                    if (rc != 0) {
+                        if (!mWholeTuple && !mAsc[i])
+                            rc *= -1;
+                        else if (mWholeTuple && !mAsc[0])
+                            rc *= -1;
+                    }
+                }
+            }
+            return rc;
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            NullableTuple nt1 = (NullableTuple) o1;
+            NullableTuple nt2 = (NullableTuple) o2;
+            int rc = 0;
+
+            // if either are null, handle differently
+            if (!nt1.isNull() && !nt2.isNull()) {
+                rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) 
nt2.getValueAsPigType());
+            } else {
+                // for sorting purposes two nulls are equal
+                if (nt1.isNull() && nt2.isNull())
+                    rc = 0;
+                else if (nt1.isNull())
+                    rc = -1;
+                else
+                    rc = 1;
+                if (mWholeTuple && !mAsc[0])
+                    rc *= -1;
+            }
+            return rc;
+        }
+
+        private int compareTuple(Tuple t1, Tuple t2) {
+            int sz1 = t1.size();
+            int sz2 = t2.size();
+            if (sz2 < sz1) {
+                return 1;
+            } else if (sz2 > sz1) {
+                return -1;
+            } else {
+                for (int i = 0; i < sz1; i++) {
+                    try {
+                        int c = DataType.compare(t1.get(i), t2.get(i));
+                        if (c != 0) {
+                            if (!mWholeTuple && !mAsc[i])
+                                c *= -1;
+                            else if (mWholeTuple && !mAsc[0])
+                                c *= -1;
+                            return c;
+                        }
+                    } catch (ExecException e) {
+                        throw new RuntimeException("Unable to compare tuples", 
e);
+                    }
+                }
+                return 0;
+            }
+        }
+
+    }
+
     @Override
     public boolean equals(Object other) {
         return (compareTo(other) == 0);
@@ -290,13 +545,12 @@ public class DefaultTuple implements Tup
     public void readFields(DataInput in) throws IOException {
         // Clear our fields, in case we're being reused.
         mFields.clear();
-    
+
         // Make sure it's a tuple.
         byte b = in.readByte();
         if (b != DataType.TUPLE) {
             int errCode = 2112;
-            String msg = "Unexpected data while reading tuple " +
-            "from binary file.";
+            String msg = "Unexpected data while reading tuple " + "from binary 
file.";
             throw new ExecException(msg, errCode, PigException.BUG);
         }
         // Read the number of fields
@@ -315,71 +569,70 @@ public class DefaultTuple implements Tup
         // 12 is added to each to account for the object overhead and the
         // pointer in the tuple.
         switch (DataType.findType(o)) {
-            case DataType.BYTEARRAY: {
-                byte[] bytes = ((DataByteArray)o).get();
-                // bytearray size including rounding to 8 bytes
-                long byte_array_sz = roundToEight(bytes.length + 12);
-                
-                return byte_array_sz + 16 /*16 is additional size of 
DataByteArray */;
-            }
+        case DataType.BYTEARRAY: {
+            byte[] bytes = ((DataByteArray) o).get();
+            // bytearray size including rounding to 8 bytes
+            long byte_array_sz = roundToEight(bytes.length + 12);
 
-            case DataType.CHARARRAY: {
-                String s = (String)o;
-                // See PIG-1443 for a reference for this formula
-                return roundToEight((s.length() * 2) + 38);
-            }
+            return byte_array_sz + 16 /* 16 is additional size of 
DataByteArray */;
+        }
 
-            case DataType.TUPLE: {
-                Tuple t = (Tuple)o;
-                return t.getMemorySize();
-            }
+        case DataType.CHARARRAY: {
+            String s = (String) o;
+            // See PIG-1443 for a reference for this formula
+            return roundToEight((s.length() * 2) + 38);
+        }
 
-            case DataType.BAG: {
-                DataBag b = (DataBag)o;
-                return b.getMemorySize();
-            }
+        case DataType.TUPLE: {
+            Tuple t = (Tuple) o;
+            return t.getMemorySize();
+        }
 
-            case DataType.INTEGER:
-                return 4 + 8 + 4/*+4 to round to 8 bytes*/;
+        case DataType.BAG: {
+            DataBag b = (DataBag) o;
+            return b.getMemorySize();
+        }
 
-            case DataType.LONG:
-                return 8 + 8;
+        case DataType.INTEGER:
+            return 4 + 8 + 4/* +4 to round to 8 bytes */;
 
-            case DataType.MAP: {
-                Map<String, Object> m = (Map<String, Object>)o;
-                Iterator<Map.Entry<String, Object> > i =
-                    m.entrySet().iterator();
-                long sum = 0;
-                while (i.hasNext()) {
-                    Map.Entry<String, Object> entry = i.next();
-                    sum += getFieldMemorySize(entry.getKey());
-                    sum += getFieldMemorySize(entry.getValue());
-                }
-                //based on experiments on 32 bit Java HotSpot VM
-                // size of map with 0 entries is 120 bytes
-                // each additional entry have around 24 bytes overhead at 
-                // small number of entries. At larger number of entries, the  
-                // overhead is around 32 bytes, probably because of the 
expanded
-                // data structures in anticapation of more entries being added
-                return sum + m.size()*32  + 120;
-            }
+        case DataType.LONG:
+            return 8 + 8;
 
-            case DataType.FLOAT:
-                return 4 + 8 + 4/*+4 to round to 8 bytes*/;
+        case DataType.MAP: {
+            Map<String, Object> m = (Map<String, Object>) o;
+            Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
+            long sum = 0;
+            while (i.hasNext()) {
+                Map.Entry<String, Object> entry = i.next();
+                sum += getFieldMemorySize(entry.getKey());
+                sum += getFieldMemorySize(entry.getValue());
+            }
+            // based on experiments on 32 bit Java HotSpot VM
+            // size of map with 0 entries is 120 bytes
+            // each additional entry have around 24 bytes overhead at
+            // small number of entries. At larger number of entries, the
+            // overhead is around 32 bytes, probably because of the expanded
+            // data structures in anticapation of more entries being added
+            return sum + m.size() * 32 + 120;
+        }
 
-            case DataType.DOUBLE:
-                return 8 + 8;
+        case DataType.FLOAT:
+            return 4 + 8 + 4/* +4 to round to 8 bytes */;
 
-            case DataType.BOOLEAN:
-                //boolean takes 1 byte , +7 to round it to 8
-                return 1 + 8 + 7;
-                
-            case DataType.NULL:
-                return 0;
+        case DataType.DOUBLE:
+            return 8 + 8;
 
-            default:
-                // ??
-                return 12;
+        case DataType.BOOLEAN:
+            // boolean takes 1 byte , +7 to round it to 8
+            return 1 + 8 + 7;
+
+        case DataType.NULL:
+            return 0;
+
+        default:
+            // ??
+            return 12;
         }
     }
 
@@ -391,11 +644,14 @@ public class DefaultTuple implements Tup
     }
 
     /**
-     * @param isNull boolean indicating whether this tuple is null
+     * @param isNull
+     *            boolean indicating whether this tuple is null
      */
     public void setNull(boolean isNull) {
         this.isNull = isNull;
     }
-
+    
+    public static Class<? extends TupleRawComparator> getComparatorClass() {
+        return DefaultTupleRawComparator.class;
+    }
 }
-

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java Mon Aug 
16 07:57:44 2010
@@ -33,5 +33,8 @@ package org.apache.pig.data;
  */
 @Deprecated 
 public class DefaultTupleFactory extends BinSedesTupleFactory {
-
+    @Override
+    public Class<? extends TupleRawComparator> tupleRawComparatorClass() {
+        return DefaultTuple.getComparatorClass();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java Mon Aug 16 
07:57:44 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DefaultTuple.DefaultTupleRawComparator;
 
 /**
  * A class to handle reading and writing of intermediate results of data
@@ -76,5 +77,7 @@ public interface InterSedes {
      */
     public void writeDatum(DataOutput out, Object val) 
     throws IOException;
+    
+    public Class<? extends TupleRawComparator> getTupleRawComparatorClass();
 }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=985819&r1=985818&r2=985819&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Aug 16 
07:57:44 2010
@@ -23,6 +23,8 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
 
+import org.apache.hadoop.io.RawComparator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
@@ -124,13 +126,12 @@ public abstract class TupleFactory {
 
     /**
      * Return the actual class representing a tuple that the implementing
-     * factory will be returning.  This is needed because hadoop needs
+     * factory will be returning.  This is needed because Hadoop needs
      * to know the exact class we will be using for input and output.
      * @return Class that implements tuple.
      */
-    public abstract Class tupleClass();
- 
-
+    public abstract Class<? extends Tuple> tupleClass();
+    
     protected TupleFactory() {
     }
 
@@ -141,6 +142,18 @@ public abstract class TupleFactory {
     public static void resetSelf() {
         gSelf = null;
     }
+    
+    /**
+     * Return the actual class implementing the raw comparator for tuples
+     * that the factory will be returning. Ovverride this to allow Hadoop to
+     * speed up tuple sorting. The actual returned class should know the
+     * serialization details for the tuple. The default implementation 
+     * (PigTupleDefaultRawComparator) will serialize the data before comparison
+     * @return Class that implements tuple raw comparator.
+     */
+    public Class<? extends TupleRawComparator> tupleRawComparatorClass() {
+        return PigTupleDefaultRawComparator.class;
+    }
 
 }
 

Added: hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java?rev=985819&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleRawComparator.java Mon Aug 16 
07:57:44 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * This interface is intended to compare Tuples. The semantics of Tuple 
comparison must take into account null values in
+ * different ways. According to SQL semantics nulls are not equal. But for 
other Pig/Latin statements nulls must be
+ * grouped together. This interface allows to check if there are null fields 
in the tuples compared using this
+ * comparator. This method is meaningful only when the tuples are determined 
to be equal by the
+ * {...@link #compare(byte[],int,int,byte[],int,int)} method.
+ * 
+ */
+...@suppresswarnings("rawtypes")
+public interface TupleRawComparator extends RawComparator, Configurable {
+    /**
+     * Checks if one of the compared tuples had a null field. This method is 
meaningful only when
+     * {...@link #compare(byte[],int,int,byte[],int,int)} has returned a zero 
value (i.e. tuples are determined to be
+     * equal).
+     * 
+     * @return true if one of the compared tuples had a null field, false 
otherwise.
+     */
+    public boolean hasComparedTupleNull();
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java?rev=985819&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigTupleRawComparator.java 
Mon Aug 16 07:57:44 2010
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPigTupleRawComparator {
+
+    private TupleFactory tf = TupleFactory.getInstance();
+    private PigTupleSortComparator comparator = new PigTupleSortComparator();
+    private PigTupleDefaultRawComparator oldComparator = new 
PigTupleDefaultRawComparator();
+    private List<Object> list;
+    private NullableTuple prototype;
+    private ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
+    private ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+    private DataOutputStream dos1 = new DataOutputStream(baos1);
+    private DataOutputStream dos2 = new DataOutputStream(baos2);
+    private final static int TUPLE_NUMBER = (int) 1e3;
+    private final static int TIMES = (int) 1e5;
+    private final static int SEED = 123456789;
+
+    @Before
+    public void setUp() {
+        JobConf jobConf = new JobConf();
+        comparator.setConf(jobConf);
+        oldComparator.setConf(jobConf);
+        list = Arrays.<Object> asList(1f, 2, 3.0, 4l, (byte) 5, true,
+                new DataByteArray(new byte[] { 0x10, 0x2a, 0x5e }), "hello 
world!",
+                tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 11)));
+        prototype = new NullableTuple(tf.newTuple(list));
+        baos1.reset();
+        baos2.reset();
+    }
+
+    @Test
+    public void testCompareEquals() throws IOException {
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res == 0);
+    }
+
+    @Test
+    public void testCompareFloat() throws IOException {
+        list.set(0, (Float) list.get(0) - 1);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+    }
+
+    @Test
+    public void testCompareInt() throws IOException {
+        list.set(1, (Integer) list.get(1) + 1);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testCompareDouble() throws IOException {
+        list.set(2, (Double) list.get(2) + 0.1);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testCompareByte() throws IOException {
+        list.set(4, (Byte) list.get(4) + 1);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testCompareBoolean() throws IOException {
+        list.set(5, false);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+    }
+
+    @Test
+    public void testCompareByteArray() throws IOException {
+        list.set(6, new DataByteArray(new byte[] { 0x10, 0x1a }));
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        list.set(6, new DataByteArray(new byte[] { 0x20 }));
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+    }
+
+    @Test
+    public void testCompareCharArray() throws IOException {
+        list.set(7, "hello world!");
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res == 0);
+        list.set(7, "hello worlc!");
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        list.set(7, "hello worlz!");
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+        list.set(7, "hello");
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        list.set(7, "hello world!?");
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void compareInnerTuples() throws IOException {
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res == 0);
+        list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 10l, 12)));
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+        list.set(8, tf.newTuple(Arrays.<Object> asList(8.0, 9f, 9l, 12)));
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        list.set(8, tf.newTuple(Arrays.<Object> asList(7.0, 9f, 9l, 12)));
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        // DataType.LONG < DataType.DOUBLE
+        list.set(8, tf.newTuple(Arrays.<Object> asList(8l, 9f, 9l, 12)));
+        t = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        // object after tuple
+        list = new ArrayList<Object>(list);
+        list.add(10);
+        NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+        list.set(list.size() - 1, 11);
+        NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res < 0);
+        // fancy tuple nesting
+        list.set(list.size() - 1, tf.newTuple(list));
+        t1 = new NullableTuple(tf.newTuple(list));
+        list.set(list.size() - 1, 10);
+        list.set(list.size() - 1, tf.newTuple(list));
+        t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res > 0);
+    }
+
+    @Test
+    public void testCompareDataBag() throws IOException {
+        list = new ArrayList<Object>(list);
+        list.add(new 
DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(0)))));
+        NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+        list.set(list.size() - 1, new 
DefaultDataBag(Arrays.asList(tf.newTuple(Arrays.asList(1)))));
+        NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testCompareMap() throws IOException {
+        list = new ArrayList<Object>(list);
+        list.add(Collections.singletonMap("pig", "scalability"));
+        NullableTuple t1 = new NullableTuple(tf.newTuple(list));
+        list.set(list.size() - 1, Collections.singletonMap("pig", 
"scalability"));
+        NullableTuple t2 = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res == 0);
+        list.set(list.size() - 1, Collections.singletonMap("pigg", 
"scalability"));
+        t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res < 0);
+        list.set(list.size() - 1, Collections.singletonMap("pig", 
"Scalability"));
+        t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res > 0);
+        list.set(list.size() - 1, Collections.singletonMap("pii", 
"scalability"));
+        t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res < 0);
+        // object after map
+        list.add(107);
+        t1 = new NullableTuple(tf.newTuple(list));
+        list.set(list.size() - 1, 108);
+        t2 = new NullableTuple(tf.newTuple(list));
+        res = compareHelper(t1, t2, comparator);
+        assertEquals(Math.signum(t1.compareTo(t2)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testCompareDiffertTypes() throws IOException {
+        // DataType.INTEGER < DataType.LONG
+        list.set(3, 4);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res > 0);
+    }
+
+    @Test
+    public void testCompareDifferentSizes() throws IOException {
+        list = new ArrayList<Object>(list);
+        // this object should be never get into the comparison loop
+        list.add(new DefaultDataBag());
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
+        assertTrue(res < 0);
+    }
+
+    @Test
+    public void testRandomTuples() throws IOException {
+        Random rand = new Random(SEED);
+        for (int i = 0; i < TUPLE_NUMBER; i++) {
+            NullableTuple t = new NullableTuple(getRandomTuple(rand));
+            int res = compareHelper(prototype, t, comparator);
+            assertEquals(Math.signum(prototype.compareTo(t)), 
Math.signum(res), 0);
+        }
+    }
+
+    @Test
+    public void testSortOrder() throws IOException {
+        // prototype < t but we use inverse sort order
+        list.set(2, (Double) list.get(2) + 0.1);
+        NullableTuple t = new NullableTuple(tf.newTuple(list));
+        JobConf jobConf = new JobConf();
+        jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] 
{false}));
+        comparator.setConf(jobConf);
+        int res = compareHelper(prototype, t, comparator);
+        assertEquals(-1 * Math.signum(prototype.compareTo(t)), 
Math.signum(res), 0);
+        assertTrue(res > 0);
+        jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] 
{true,true,false,true,true,true,true,true,true}));
+        comparator.setConf(jobConf);
+        res = compareHelper(prototype, t, comparator);
+        assertEquals(-1 * Math.signum(prototype.compareTo(t)), 
Math.signum(res), 0);
+        assertTrue(res > 0);
+    }
+
+    private Tuple getRandomTuple(Random rand) throws IOException {
+        int pos = rand.nextInt(list.size());
+        Tuple t = tf.newTuple(list);
+        switch (pos) {
+        case 0:
+            t.set(pos, rand.nextFloat());
+            break;
+        case 1:
+            t.set(pos, rand.nextInt());
+            break;
+        case 2:
+            t.set(pos, rand.nextDouble());
+            break;
+        case 3:
+            t.set(pos, rand.nextLong());
+            break;
+        case 4:
+            t.set(pos, (byte) rand.nextInt());
+            break;
+        case 5:
+            t.set(pos, rand.nextBoolean());
+            break;
+        case 6:
+            byte[] ba = new byte[3];
+            rand.nextBytes(ba);
+            t.set(pos, new DataByteArray(ba));
+            break;
+        case 7:
+            int length = rand.nextInt(15);
+            String s = randomString(length, rand);
+            t.set(pos, s);
+            break;
+        case 8:
+            length = rand.nextInt(6);
+            t.set(pos, getRandomTuple(rand));
+        default:
+        }
+        return t;
+    }
+
+    private int compareHelper(NullableTuple t1, NullableTuple t2, 
RawComparator comparator) throws IOException {
+        t1.write(dos1);
+        t2.write(dos2);
+        byte[] b1 = baos1.toByteArray();
+        byte[] b2 = baos2.toByteArray();
+        baos1.reset();
+        baos2.reset();
+        return comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
+    }
+
+    private static final String AB = "0123456789abcdefghijklmnopqrstuwxyz!?-_ 
";
+
+    private String randomString(int length, Random rand) {
+        StringBuilder sb = new StringBuilder(length);
+        for (int i = 0; i < length; i++)
+            sb.append(AB.charAt(rand.nextInt(AB.length())));
+        return sb.toString();
+    }
+
+    public static void main(String[] args) throws Exception {
+        long before, after;
+        Random rand = new Random(SEED);
+        TestPigTupleRawComparator test = new TestPigTupleRawComparator();
+        test.setUp();
+        byte[][] toCompare1 = new byte[TUPLE_NUMBER][];
+        byte[][] toCompare2 = new byte[TUPLE_NUMBER][];
+        NullableTuple t;
+        for (int i = 0; i < TUPLE_NUMBER; i++) {
+            t = new NullableTuple(test.getRandomTuple(rand));
+            t.write(test.dos1);
+            toCompare1[i] = test.baos1.toByteArray();
+        }
+        for (int i = 0; i < TUPLE_NUMBER; i++) {
+            t = new NullableTuple(test.getRandomTuple(rand));
+            t.write(test.dos2);
+            toCompare2[i] = test.baos2.toByteArray();
+        }
+
+        before = System.currentTimeMillis();
+        for (int loop = 0; loop < TIMES; loop++) {
+            for (int i = 0; i < TUPLE_NUMBER; i++) {
+                test.comparator.compare(toCompare1[i], 0, 
toCompare1[i].length, toCompare2[i], 0, toCompare2[i].length);
+            }
+        }
+        after = System.currentTimeMillis();
+        System.out.println("Raw Version - elapsed time: " + 
Long.toString(after - before) + " milliseconds");
+
+        before = System.currentTimeMillis();
+        for (int loop = 0; loop < TIMES; loop++) {
+            for (int i = 0; i < TUPLE_NUMBER; i++) {
+                test.oldComparator.compare(toCompare1[i], 0, 
toCompare1[i].length, toCompare2[i], 0,
+                        toCompare2[i].length);
+            }
+        }
+        after = System.currentTimeMillis();
+        System.out.println("Old Version - elapsed time: " + 
Long.toString(after - before) + " milliseconds");
+    }
+}


Reply via email to