Author: olga
Date: Thu Aug 14 16:20:00 2008
New Revision: 686085

URL: http://svn.apache.org/viewvc?rev=686085&view=rev
Log:
PIG-301: order by descending

Added:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
Modified:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Thu Aug 14 16:20:00 2008
@@ -289,8 +289,12 @@
                 if(mro.UDFs.size()==1){
                     String compFuncSpec = mro.UDFs.get(0);
                     Class comparator = 
PigContext.resolveClassName(compFuncSpec);
-                    if(ComparisonFunc.class.isAssignableFrom(comparator))
+                    if(ComparisonFunc.class.isAssignableFrom(comparator)) {
                         jobConf.setOutputKeyComparatorClass(comparator);
+                    }
+                } else {
+                    jobConf.set("pig.sortOrder",
+                        ObjectSerializer.serialize(mro.getSortOrder()));
                 }
             }
     
@@ -385,7 +389,44 @@
                 if (succ.isGlobalSort()) involved = true;
             }
         }
-        if (!involved) {
+        if (involved) {
+            switch (keyType) {
+            case DataType.INTEGER:
+                jobConf.setOutputKeyComparatorClass(PigIntRawComparator.class);
+                break;
+
+            case DataType.LONG:
+                
jobConf.setOutputKeyComparatorClass(PigLongRawComparator.class);
+                break;
+
+            case DataType.FLOAT:
+                
jobConf.setOutputKeyComparatorClass(PigFloatRawComparator.class);
+                break;
+
+            case DataType.DOUBLE:
+                
jobConf.setOutputKeyComparatorClass(PigDoubleRawComparator.class);
+                break;
+
+            case DataType.CHARARRAY:
+                
jobConf.setOutputKeyComparatorClass(PigTextRawComparator.class);
+                break;
+
+            case DataType.BYTEARRAY:
+                
jobConf.setOutputKeyComparatorClass(PigBytesRawComparator.class);
+                break;
+
+            case DataType.MAP:
+                log.error("Using Map as key not supported.");
+                throw new JobCreationException("Using Map as key not 
supported");
+
+            case DataType.TUPLE:
+                
jobConf.setOutputKeyComparatorClass(PigTupleRawComparator.class);
+                break;
+
+            default:
+                break;
+            }
+        } else {
             switch (keyType) {
             case DataType.INTEGER:
                 
jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Aug 14 16:20:00 2008
@@ -825,8 +825,7 @@
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
             }
         }catch(Exception e){
-            VisitorException pe = new VisitorException(e.getMessage());
-            pe.initCause(e);
+            VisitorException pe = new VisitorException(e.getMessage(), e);
             throw pe;
         }
     }
@@ -864,6 +863,17 @@
 
         byte keyType = DataType.UNKNOWN;
         
+        boolean[] sortOrder;
+
+        List<Boolean> sortOrderList = sort.getMAscCols();
+        if(sortOrderList != null) {
+            sortOrder = new boolean[sortOrderList.size()];
+            for(int i = 0; i < sortOrderList.size(); ++i) {
+                sortOrder[i] = sortOrderList.get(i);
+            }
+            mro.setSortOrder(sortOrder);
+        }
+
         if (fields == null) {
             // This is project *
             PhysicalPlan ep = new PhysicalPlan();
@@ -1004,8 +1014,9 @@
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),
                 inpSort.getMAscCols(), inpSort.getMSortFunc());
-        if(sort.isUDFComparatorUsed)
+        if(sort.isUDFComparatorUsed) {
             mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
+        }
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Thu Aug 14 16:20:00 2008
@@ -72,6 +72,10 @@
     //The quantiles file name if globalSort is true
     String quantFile;
     
+    //The sort order of the columns;
+    //asc is true and desc is false
+    boolean[] sortOrder;
+
     public List<String> UDFs;
     
     NodeIdGenerator nig;
@@ -214,4 +218,16 @@
     public void setQuantFile(String quantFile) {
         this.quantFile = quantFile;
     }
+
+    public void setSortOrder(boolean[] sortOrder) {
+        if(null == sortOrder) return;
+        this.sortOrder = new boolean[sortOrder.length];
+        for(int i = 0; i < sortOrder.length; ++i) {
+            this.sortOrder[i] = sortOrder[i];
+        }
+    }
+             
+    public boolean[] getSortOrder() {
+        return sortOrder;
+    }
 }

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigBytesRawComparator extends BytesWritable.Comparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableBytesWritable.NOTNULL &&
+                b2[s2] == NullableBytesWritable.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableBytesWritable.NULL &&
+                    b2[s2] == NullableBytesWritable.NULL) rc = 0;
+            else if (b1[s1] == NullableBytesWritable.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.backend.hadoop.DoubleWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigDoubleRawComparator extends DoubleWritable.Comparator 
implements Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableDoubleWritable.NOTNULL &&
+                b2[s2] == NullableDoubleWritable.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableDoubleWritable.NULL &&
+                    b2[s2] == NullableDoubleWritable.NULL) rc = 0;
+            else if (b1[s1] == NullableDoubleWritable.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigFloatRawComparator extends FloatWritable.Comparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableFloatWritable.NOTNULL &&
+                b2[s2] == NullableFloatWritable.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableFloatWritable.NULL &&
+                    b2[s2] == NullableFloatWritable.NULL) rc = 0;
+            else if (b1[s1] == NullableFloatWritable.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigIntRawComparator extends IntWritable.Comparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableIntWritable.NOTNULL &&
+                b2[s2] == NullableIntWritable.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableIntWritable.NULL &&
+                    b2[s2] == NullableIntWritable.NULL) rc = 0;
+            else if (b1[s1] == NullableIntWritable.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigLongRawComparator extends LongWritable.Comparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableLongWritable.NOTNULL &&
+                b2[s2] == NullableLongWritable.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableLongWritable.NULL &&
+                    b2[s2] == NullableLongWritable.NULL) rc = 0;
+            else if (b1[s1] == NullableLongWritable.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigTextRawComparator extends Text.Comparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        int rc = 0;
+        // If either are null, handle differently.
+        if (b1[s1] == NullableText.NOTNULL &&
+                b2[s2] == NullableText.NOTNULL) {
+            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] == NullableText.NULL &&
+                    b2[s2] == NullableText.NULL) rc = 0;
+            else if (b1[s1] == NullableText.NULL) rc = -1;
+            else rc = 1;
+        }
+        if (!mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java?rev=686085&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
 Thu Aug 14 16:20:00 2008
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class PigTupleRawComparator extends WritableComparator implements 
Configurable {
+
+    private final Log mLog = LogFactory.getLog(getClass());
+    private boolean[] mAsc;
+    private boolean mWholeTuple;
+    private TupleFactory mFact;
+
+    public PigTupleRawComparator() {
+        super(TupleFactory.getInstance().tupleClass());
+    }
+
+    public void setConf(Configuration conf) {
+        if (!(conf instanceof JobConf)) {
+            mLog.warn("Expected jobconf in setConf, got " +
+                conf.getClass().getName());
+            return;
+        }
+        JobConf jconf = (JobConf)conf;
+        try {
+            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+                "pig.sortOrder"));
+        } catch (IOException ioe) {
+            mLog.error("Unable to deserialize pig.sortOrder " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe);
+        }
+        if (mAsc == null) {
+            mAsc = new boolean[1];
+            mAsc[0] = true;
+        }
+        // If there's only one entry in mAsc, it means it's for the whole
+        // tuple.  So we can't be looking for each column.
+        mWholeTuple = (mAsc.length == 1);
+        mFact = TupleFactory.getInstance();
+    }
+
+    public Configuration getConf() {
+        return null;
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        // This can't be done on the raw data.  Users are allowed to
+        // implement their own versions of tuples, which means we have no
+        // idea what the underlying representation is.  So step one is to
+        // instantiate each object as a tuple.
+        Tuple t1 = mFact.newTuple();
+        Tuple t2 = mFact.newTuple();
+        try {
+            t1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, 
l1)));
+            t2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, 
l2)));
+        } catch (IOException ioe) {
+            mLog.error("Unable to instantiate tuples for comparison: " +
+                ioe.getMessage());
+            throw new RuntimeException(ioe.getMessage(), ioe);
+        }
+
+        int rc;
+        if (t1.isNull() || t2.isNull()) {
+            // For sorting purposes two nulls are equal.
+            if (t1.isNull() && t2.isNull()) rc = 0;
+            else if (t1.isNull()) rc = -1;
+            else rc = 1;
+        } else {
+            int sz1 = t1.size();
+            int sz2 = t2.size();
+            if (sz2 < sz1) {
+                rc = 1;
+            } else if (sz2 > sz1) {
+                rc = -1;
+            } else {
+                for (int i = 0; i < sz1; i++) {
+                    try {
+                        int c = DataType.compare(t1.get(i), t2.get(i));
+                        if (c != 0) {
+                            if (!mWholeTuple && !mAsc[i]) c *= -1;
+                            else if (mWholeTuple && !mAsc[0]) c *= -1;
+                            return c;
+                        }
+                    } catch (ExecException e) {
+                        throw new RuntimeException("Unable to compare tuples", 
e);
+                    }
+                }
+                rc = 0;
+            }
+        }
+        if (mWholeTuple && !mAsc[0]) rc *= -1;
+        return rc;
+    }
+
+
+}

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java
 Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java
 Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java
 Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java
 Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java
 Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java?rev=686085&r1=686084&r2=686085&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java 
Thu Aug 14 16:20:00 2008
@@ -1,5 +1,19 @@
-/**
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.impl.io;
 


Reply via email to