Author: gates
Date: Tue Nov 27 16:17:31 2007
New Revision: 598827

URL: http://svn.apache.org/viewvc?rev=598827&view=rev
Log:
PIG-20 Added custom comparator functions for order by.  Provided by Patrick
Hunt.


Added:
    incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
    incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
    
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
    
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Tue Nov 27 16:17:31 2007
@@ -22,3 +22,5 @@
        PIG-8 added binary comparator (olgan)
 
        PIG-17 integrated with Hadoop 0.15 (olgan@)
+
+       PIG-20 Added custom comparator functions for order by (phunt via gates)

Added: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Tue Nov 27 
16:17:31 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.data.Tuple;
+
+
+public abstract class ComparisonFunc extends WritableComparator {
+    public ComparisonFunc() {
+        super(Tuple.class);
+    }
+
+    public int compare(WritableComparable a, WritableComparable b) {
+        return compare((Tuple)a, (Tuple)b);
+    }
+
+    /**
+     * This callback method must be implemented by all subclasses. Compares 
+     * its two arguments for order. Returns a negative integer, zero, or a 
+     * positive integer as the first argument is less than, equal to, or 
+     * greater than the second. The order of elements of the tuples correspond 
+     * to the fields specified in the order by clause. 
+     * Same semantics as [EMAIL PROTECTED] java.util.Comparator}.
+     * 
+     * @param t1 the first Tuple to be compared.
+     * @param t2 the second Tuple to be compared.
+     * @throws IOException
+     * @see java.util.Comparator
+     */
+    abstract public int compare(Tuple t1, Tuple t2);
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Tue Nov 27 
16:17:31 2007
@@ -22,6 +22,7 @@
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.pig.ComparisonFunc;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.FunctionInstantiator;
@@ -41,6 +42,8 @@
        transient DataCollector simpleEvalInput; 
        protected boolean inner = false; //used only if this generate spec is 
used in a group by
 
+       private String comparatorFuncName;
+       private transient Comparator<Tuple> comparator;
        
        /*
         * Keep a precomputed pipeline ready if we do simple evals
@@ -51,7 +54,36 @@
                simpleEvalInput = setupPipe(simpleEvalOutput);
        }
        
-       public void instantiateFunc(FunctionInstantiator instantiaor) throws 
IOException{};
+       public class UserComparator implements Comparator<Tuple> {
+               Comparator<Tuple> nested;
+               
+               UserComparator(Comparator<Tuple> nested) {
+                       this.nested = nested;
+               }
+       public int compare(Tuple t1, Tuple t2) {
+               Datum d1 = simpleEval(t1);
+               Datum d2 = simpleEval(t2);
+               if (d1 instanceof Tuple) {
+                       return nested.compare((Tuple)d1, (Tuple)d2);
+               } else {
+                       return nested.compare(new Tuple(d1), new Tuple(d2));
+               }
+        }
+       }
+       
+       public void instantiateFunc(FunctionInstantiator instantiaor) throws 
IOException{
+               if (comparatorFuncName != null) {
+                       Comparator<Tuple> userComparator = 
+                               
(ComparisonFunc)instantiaor.instantiateFuncFromAlias(comparatorFuncName);
+                       comparator = new UserComparator(userComparator);
+               } else {
+                       comparator = new Comparator<Tuple>() {
+                       public int compare(Tuple t1, Tuple t2) {
+                                       return 
simpleEval(t1).compareTo(simpleEval(t2));
+                       }
+                   };
+               }
+       };
        
     /**
      * set up a default data processing pipe for processing by this spec
@@ -145,17 +177,30 @@
     public abstract boolean amenableToCombiner();
 
     
+    public void setComparatorName(String name) {
+        comparatorFuncName = name;
+    }
+    
+    public String getComparatorName() {
+        return comparatorFuncName;
+    }
+    
     /**
      * Compare 2 tuples according to this spec. This is used while sorting by 
arbitrary (even generated) fields.
      * @return
      */
     public Comparator<Tuple> getComparator() {
-        return new Comparator<Tuple>() {
-               
-               public int compare(Tuple t1, Tuple t2) {
-                       return (simpleEval(t1).compareTo(simpleEval(t2)));
-            }
-        };
+               if (comparator != null)
+            return comparator;
+               else
+        {
+            comparator = new Comparator<Tuple>() {
+                public int compare(Tuple t1, Tuple t2) {
+                    return simpleEval(t1).compareTo(simpleEval(t2));
+                }
+            };
+            return comparator;
+        }
     }
     
     public void setFlatten(boolean isFlattened){

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Tue Nov 27 16:17:31 2007
@@ -515,7 +515,7 @@
 }
 
 
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; 
ProjectSpec projSpec;}
+LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; 
ProjectSpec projSpec; String funcName;}
 {
        (
        op = NestedExpr() <BY> 
@@ -530,6 +530,17 @@
                )
        |       (sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
        )
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
+
        )
        {
                return new LOSort(op, sortSpec);
@@ -607,13 +618,23 @@
 }
 
 EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = 
null; Token t;}
+{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = 
null; Token t; String funcName;}
 {
        (
        ( t = <ORDER> | t = <ARRANGE> )
        item = BaseEvalSpec(over,specs) { subSchema = 
item.getOutputSchemaForPipe(over); }
-       <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) 
-               | sortSpec = Star() )
+       <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
+               | sortSpec = Star() )     
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
        )
        { return copyItemAndAddSpec(item,new SortDistinctSpec(false, 
sortSpec)); }
 }
@@ -932,6 +953,7 @@
                return funcName;
        }       
 }
+
 
 /**
  * Bug 831620 - '$' support

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
 Tue Nov 27 16:17:31 2007
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.impl.mapreduceExec;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,7 +29,6 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.physicalLayer.POMapreduce;
@@ -74,7 +71,7 @@
             return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
            }
     }
-        
+
     static Random rand = new Random();
 
     /**
@@ -161,6 +158,8 @@
                // not used starting with 0.15 
conf.setInputKeyClass(Text.class);
                // not used starting with 0.15 
conf.setInputValueClass(Tuple.class);
                conf.setOutputKeyClass(Tuple.class);
+               if (pom.userComparator != null)
+                       conf.setOutputKeyComparatorClass(pom.userComparator);
                conf.setOutputValueClass(IndexedTuple.class);
                conf.set("pig.inputs", 
ObjectSerializer.serialize(pom.inputFileSpecs));
             

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java 
(original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java 
Tue Nov 27 16:17:31 2007
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.pig.builtin.BinStorage;
@@ -33,13 +34,14 @@
 
 
 public class SortPartitioner implements Partitioner {
-       Tuple[] quantiles;
+       Tuple[] quantiles;
+       WritableComparator comparator;
        
        public int getPartition(WritableComparable key, Writable value,
                        int numPartitions) {
                try{
                        Tuple keyTuple = (Tuple)key;
-                       int index = Arrays.binarySearch(quantiles, 
keyTuple.getTupleField(0));
+                       int index = Arrays.binarySearch(quantiles, 
keyTuple.getTupleField(0), comparator);
                        if (index < 0)
                                index = -index-1;
                        return Math.min(index, numPartitions - 1);
@@ -48,7 +50,7 @@
                }
        }
 
-       public void configure(JobConf job) {
+       public void configure(JobConf job) {
                String quantilesFile = job.get("pig.quantilesFile", "");
                if (quantilesFile.length() == 0)
                        throw new RuntimeException("Sort paritioner used but no 
quantiles found");
@@ -71,6 +73,8 @@
                }catch (IOException e){
                        throw new RuntimeException(e);
                }
+
+               comparator = job.getOutputKeyComparator();
        }
 
 }

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
 Tue Nov 27 16:17:31 2007
@@ -19,9 +19,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Map;
 
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -271,6 +274,17 @@
                sortJob.addReduceSpec(new GenerateSpec(ps));
        
        sortJob.reduceParallelism = loSort.getRequestedParallelism();
+       
+       String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+       if (comparatorFuncName != null) {
+               try {
+                       sortJob.userComparator = 
+                               
(Class<WritableComparator>)Class.forName(comparatorFuncName);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Unable to find user 
comparator " + comparatorFuncName, e);
+               }
+       }
+
        return sortJob;
     }
     

Modified: 
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java 
(original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java 
Tue Nov 27 16:17:31 2007
@@ -19,7 +19,10 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
@@ -40,12 +43,14 @@
     public ArrayList<FileSpec>           inputFileSpecs         = new 
ArrayList<FileSpec>();
     public FileSpec         outputFileSpec        = null;
     public Class           partitionFunction = null;
+    public Class<WritableComparator> userComparator = null;
     public String quantilesFile = null;
     public PigContext pigContext;
     
     public int                     mapParallelism       = -1;     // -1 means 
let hadoop decide
     public int                     reduceParallelism    = -1;
 
+    
     static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
 
     

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java Tue Nov 27 
16:17:31 2007
@@ -0,0 +1,13 @@
+package org.apache.pig.test;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdAsc extends ComparisonFunc {
+    // this is a simple example - more complex comparison will require
+    //   breakout of the individual values. I suggest you'll have
+    //   to convert "catch(IOException e) to RuntimeException('msg', e)"
+    public int compare(Tuple t1, Tuple t2) {
+        return t1.compareTo(t2);
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java Tue Nov 27 
16:17:31 2007
@@ -0,0 +1,13 @@
+package org.apache.pig.test;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdDesc extends ComparisonFunc {
+    // this is a simple example - more complex comparison will require
+    //   breakout of the individual values. I suggest you'll have
+    //   to convert "catch(IOException e) to RuntimeException('msg', e)"
+    public int compare(Tuple t1, Tuple t2) {
+        return t2.compareTo(t1);
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java Tue Nov 27 
16:17:31 2007
@@ -0,0 +1,42 @@
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdDescNumeric extends ComparisonFunc {
+    public int compare(Tuple t1, Tuple t2) {
+        try {
+            for (int i = 0; i < t1.arity(); i++) {
+                Datum d1 = t1.getField(i);
+                Datum d2 = t2.getField(i);
+                int comp;
+                if (d1 instanceof DataAtom) {
+                    comp = compare((DataAtom)d1, (DataAtom)d2);
+                } else {
+                    comp = compare((Tuple)d1, (Tuple)d2);
+                }
+                if (comp != 0) {
+                    return comp;
+                }
+            }
+            return 0;
+        } catch (IOException e) {
+            throw new RuntimeException("Error comparing keys in 
OrdDEscNumeric", e);
+        }
+    }
+    
+    private int compare(DataAtom a1, DataAtom a2) throws IOException {
+        double num1 = a1.numval();
+        double num2 = a2.numval();
+        if (num2 > num1) {
+            return 1;
+        } else if (num2 < num1) {
+            return -1;
+        }
+        return 0;
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java Tue Nov 27 
16:17:31 2007
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+
+public class TestOrderBy extends TestCase {
+    private String initString = "mapreduce";
+    private static final int DATALEN = 1024;
+    private String[][] DATA = new String[2][DATALEN];
+    
+    private PigServer pig;
+    private File tmpFile;
+
+    public TestOrderBy() throws Exception {
+        DecimalFormat myFormatter = new DecimalFormat("0000000");
+        for (int i = 0; i < DATALEN; i++) {
+            DATA[0][i] = myFormatter.format(i);
+            DATA[1][i] = myFormatter.format(DATALEN - i - 1);
+        }
+        pig = new PigServer(initString);
+    }
+    
+    protected void setUp() throws Exception {
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < DATALEN; i++) {
+            ps.println("1\t" + DATA[1][i] + "\t" + DATA[0][i]);
+        }
+        ps.close();
+    }
+    
+    protected void tearDown() throws Exception {
+        tmpFile.delete();
+    }
+    
+    private void verify(String query, boolean descending) throws Exception {
+        pig.registerQuery(query);
+        Iterator<Tuple> it = pig.openIterator("myid");
+        int col = (descending ? 1 : 0);
+        for(int i = 0; i < DATALEN; i++) {
+            Tuple t = (Tuple)it.next();
+            int value = t.getAtomField(1).numval().intValue();
+//            System.out.println("" + i + "," + DATA[0][i] + "," + DATA[1][i] 
+ "," + value);
+            assertEquals(Integer.parseInt(DATA[col][i]), value);
+        }
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Star_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY *;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col1_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $1;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col2_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $2;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col21_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $2, $1;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Star_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdAsc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdDesc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdDescNumeric;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col1_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdAsc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdDesc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdDescNumeric;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col2_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdAsc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdDesc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdDescNumeric;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col21_Using() throws Exception {
+        // col2/col1 ascending - 
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdAsc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdDesc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdDescNumeric;", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Star_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY *; generate flatten(D); };", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col1_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1; generate flatten(D); };", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col2_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $2; generate flatten(D); };", true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col21_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $2, $1; generate flatten(D); };", 
true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Star_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdAsc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdDesc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", 
true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col1_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1 USING " + 
+            "org.apache.pig.test.OrdAsc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1 USING " + 
+            "org.apache.pig.test.OrdDesc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $1 USING " + 
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col2_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdAsc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdDesc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col21_Using() throws Exception {
+        // col2/col1 ascending - 
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdAsc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdDesc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                false);
+    }
+
+}


Reply via email to