Author: gates Date: Tue Nov 27 16:17:31 2007 New Revision: 598827 URL: http://svn.apache.org/viewvc?rev=598827&view=rev Log: PIG-20 Added custom comparator functions for order by. Provided by Patrick Hunt.
Added: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Tue Nov 27 16:17:31 2007 @@ -22,3 +22,5 @@ PIG-8 added binary comparator (olgan) PIG-17 integrated with Hadoop 0.15 (olgan@) + + PIG-20 Added custom comparator functions for order by (phunt via gates) Added: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=598827&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (added) +++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Tue Nov 27 16:17:31 2007 @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig; + +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.pig.data.Tuple; + + +public abstract class ComparisonFunc extends WritableComparator { + public ComparisonFunc() { + super(Tuple.class); + } + + public int compare(WritableComparable a, WritableComparable b) { + return compare((Tuple)a, (Tuple)b); + } + + /** + * This callback method must be implemented by all subclasses. Compares + * its two arguments for order. Returns a negative integer, zero, or a + * positive integer as the first argument is less than, equal to, or + * greater than the second. The order of elements of the tuples correspond + * to the fields specified in the order by clause. + * Same semantics as [EMAIL PROTECTED] java.util.Comparator}. + * + * @param t1 the first Tuple to be compared. + * @param t2 the second Tuple to be compared. + * @throws IOException + * @see java.util.Comparator + */ + abstract public int compare(Tuple t1, Tuple t2); +} Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Tue Nov 27 16:17:31 2007 @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.List; +import org.apache.pig.ComparisonFunc; import org.apache.pig.data.Datum; import org.apache.pig.data.Tuple; import org.apache.pig.impl.FunctionInstantiator; @@ -41,6 +42,8 @@ transient DataCollector simpleEvalInput; protected boolean inner = false; //used only if this generate spec is used in a group by + private String comparatorFuncName; + private transient Comparator<Tuple> comparator; /* * Keep a precomputed pipeline ready if we do simple evals @@ -51,7 +54,36 @@ simpleEvalInput = setupPipe(simpleEvalOutput); } - public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{}; + public class UserComparator implements Comparator<Tuple> { + Comparator<Tuple> nested; + + UserComparator(Comparator<Tuple> nested) { + this.nested = nested; + } + public int compare(Tuple t1, Tuple t2) { + Datum d1 = simpleEval(t1); + Datum d2 = simpleEval(t2); + if (d1 instanceof Tuple) { + return nested.compare((Tuple)d1, (Tuple)d2); + } else { + return nested.compare(new Tuple(d1), new Tuple(d2)); + } + } + } + + public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{ + if (comparatorFuncName != null) { + Comparator<Tuple> userComparator = + (ComparisonFunc)instantiaor.instantiateFuncFromAlias(comparatorFuncName); + comparator = new UserComparator(userComparator); + } else { + comparator = new Comparator<Tuple>() { + public int compare(Tuple t1, Tuple t2) { + return simpleEval(t1).compareTo(simpleEval(t2)); + } + }; + } + }; /** * set up a default data processing pipe for processing by this spec @@ -145,17 +177,30 @@ public abstract boolean amenableToCombiner(); + public void setComparatorName(String name) { + comparatorFuncName = name; + } + + public String getComparatorName() { + return comparatorFuncName; + } + /** * Compare 2 tuples according to this spec. This is used while sorting by arbitrary (even generated) fields. * @return */ public Comparator<Tuple> getComparator() { - return new Comparator<Tuple>() { - - public int compare(Tuple t1, Tuple t2) { - return (simpleEval(t1).compareTo(simpleEval(t2))); - } - }; + if (comparator != null) + return comparator; + else + { + comparator = new Comparator<Tuple>() { + public int compare(Tuple t1, Tuple t2) { + return simpleEval(t1).compareTo(simpleEval(t2)); + } + }; + return comparator; + } } public void setFlatten(boolean isFlattened){ Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Nov 27 16:17:31 2007 @@ -515,7 +515,7 @@ } -LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec;} +LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;} { ( op = NestedExpr() <BY> @@ -530,6 +530,17 @@ ) | (sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);}) ) + ( + <USING> funcName = QualifiedFunction() + { + try { + sortSpec.setComparatorName(funcName); + } catch (Exception e){ + throw new ParseException(e.getMessage()); + } + } + )? + ) { return new LOSort(op, sortSpec); @@ -607,13 +618,23 @@ } EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs): -{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;} +{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;} { ( ( t = <ORDER> | t = <ARRANGE> ) item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); } - <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) - | sortSpec = Star() ) + <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) + | sortSpec = Star() ) + ( + <USING> funcName = QualifiedFunction() + { + try { + sortSpec.setComparatorName(funcName); + } catch (Exception e){ + throw new ParseException(e.getMessage()); + } + } + )? ) { return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); } } @@ -932,6 +953,7 @@ return funcName; } } + /** * Bug 831620 - '$' support Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Tue Nov 27 16:17:31 2007 @@ -17,8 +17,6 @@ */ package org.apache.pig.impl.mapreduceExec; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -31,7 +29,6 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.io.PigFile; import org.apache.pig.impl.physicalLayer.POMapreduce; @@ -74,7 +71,7 @@ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); } } - + static Random rand = new Random(); /** @@ -161,6 +158,8 @@ // not used starting with 0.15 conf.setInputKeyClass(Text.class); // not used starting with 0.15 conf.setInputValueClass(Tuple.class); conf.setOutputKeyClass(Tuple.class); + if (pom.userComparator != null) + conf.setOutputKeyComparatorClass(pom.userComparator); conf.setOutputValueClass(IndexedTuple.class); conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs)); Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java Tue Nov 27 16:17:31 2007 @@ -24,6 +24,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.pig.builtin.BinStorage; @@ -33,13 +34,14 @@ public class SortPartitioner implements Partitioner { - Tuple[] quantiles; + Tuple[] quantiles; + WritableComparator comparator; public int getPartition(WritableComparable key, Writable value, int numPartitions) { try{ Tuple keyTuple = (Tuple)key; - int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0)); + int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0), comparator); if (index < 0) index = -index-1; return Math.min(index, numPartitions - 1); @@ -48,7 +50,7 @@ } } - public void configure(JobConf job) { + public void configure(JobConf job) { String quantilesFile = job.get("pig.quantilesFile", ""); if (quantilesFile.length() == 0) throw new RuntimeException("Sort paritioner used but no quantiles found"); @@ -71,6 +73,8 @@ }catch (IOException e){ throw new RuntimeException(e); } + + comparator = job.getOutputKeyComparator(); } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Tue Nov 27 16:17:31 2007 @@ -19,9 +19,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.Map; +import org.apache.hadoop.io.WritableComparator; import org.apache.pig.builtin.BinStorage; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.RandomSampleLoader; @@ -271,6 +274,17 @@ sortJob.addReduceSpec(new GenerateSpec(ps)); sortJob.reduceParallelism = loSort.getRequestedParallelism(); + + String comparatorFuncName = loSort.getSortSpec().getComparatorName(); + if (comparatorFuncName != null) { + try { + sortJob.userComparator = + (Class<WritableComparator>)Class.forName(comparatorFuncName); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to find user comparator " + comparatorFuncName, e); + } + } + return sortJob; } Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=598827&r1=598826&r2=598827&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Tue Nov 27 16:17:31 2007 @@ -19,7 +19,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; @@ -40,12 +43,14 @@ public ArrayList<FileSpec> inputFileSpecs = new ArrayList<FileSpec>(); public FileSpec outputFileSpec = null; public Class partitionFunction = null; + public Class<WritableComparator> userComparator = null; public String quantilesFile = null; public PigContext pigContext; public int mapParallelism = -1; // -1 means let hadoop decide public int reduceParallelism = -1; + static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher(); Added: incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java?rev=598827&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java Tue Nov 27 16:17:31 2007 @@ -0,0 +1,13 @@ +package org.apache.pig.test; + +import org.apache.pig.data.Tuple; +import org.apache.pig.ComparisonFunc; + +public class OrdAsc extends ComparisonFunc { + // this is a simple example - more complex comparison will require + // breakout of the individual values. I suggest you'll have + // to convert "catch(IOException e) to RuntimeException('msg', e)" + public int compare(Tuple t1, Tuple t2) { + return t1.compareTo(t2); + } +} Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java?rev=598827&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java Tue Nov 27 16:17:31 2007 @@ -0,0 +1,13 @@ +package org.apache.pig.test; + +import org.apache.pig.data.Tuple; +import org.apache.pig.ComparisonFunc; + +public class OrdDesc extends ComparisonFunc { + // this is a simple example - more complex comparison will require + // breakout of the individual values. I suggest you'll have + // to convert "catch(IOException e) to RuntimeException('msg', e)" + public int compare(Tuple t1, Tuple t2) { + return t2.compareTo(t1); + } +} Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java?rev=598827&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java Tue Nov 27 16:17:31 2007 @@ -0,0 +1,42 @@ +package org.apache.pig.test; + +import java.io.IOException; + +import org.apache.pig.data.DataAtom; +import org.apache.pig.data.Datum; +import org.apache.pig.data.Tuple; +import org.apache.pig.ComparisonFunc; + +public class OrdDescNumeric extends ComparisonFunc { + public int compare(Tuple t1, Tuple t2) { + try { + for (int i = 0; i < t1.arity(); i++) { + Datum d1 = t1.getField(i); + Datum d2 = t2.getField(i); + int comp; + if (d1 instanceof DataAtom) { + comp = compare((DataAtom)d1, (DataAtom)d2); + } else { + comp = compare((Tuple)d1, (Tuple)d2); + } + if (comp != 0) { + return comp; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Error comparing keys in OrdDEscNumeric", e); + } + } + + private int compare(DataAtom a1, DataAtom a2) throws IOException { + double num1 = a1.numval(); + double num2 = a2.numval(); + if (num2 > num1) { + return 1; + } else if (num2 < num1) { + return -1; + } + return 0; + } +} Added: incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java?rev=598827&view=auto ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java (added) +++ incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java Tue Nov 27 16:17:31 2007 @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.junit.Test; + +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; + +public class TestOrderBy extends TestCase { + private String initString = "mapreduce"; + private static final int DATALEN = 1024; + private String[][] DATA = new String[2][DATALEN]; + + private PigServer pig; + private File tmpFile; + + public TestOrderBy() throws Exception { + DecimalFormat myFormatter = new DecimalFormat("0000000"); + for (int i = 0; i < DATALEN; i++) { + DATA[0][i] = myFormatter.format(i); + DATA[1][i] = myFormatter.format(DATALEN - i - 1); + } + pig = new PigServer(initString); + } + + protected void setUp() throws Exception { + tmpFile = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + for(int i = 0; i < DATALEN; i++) { + ps.println("1\t" + DATA[1][i] + "\t" + DATA[0][i]); + } + ps.close(); + } + + protected void tearDown() throws Exception { + tmpFile.delete(); + } + + private void verify(String query, boolean descending) throws Exception { + pig.registerQuery(query); + Iterator<Tuple> it = pig.openIterator("myid"); + int col = (descending ? 1 : 0); + for(int i = 0; i < DATALEN; i++) { + Tuple t = (Tuple)it.next(); + int value = t.getAtomField(1).numval().intValue(); +// System.out.println("" + i + "," + DATA[0][i] + "," + DATA[1][i] + "," + value); + assertEquals(Integer.parseInt(DATA[col][i]), value); + } + assertFalse(it.hasNext()); + } + + @Test + public void testTopLevelOrderBy_Star_NoUsing() throws Exception { + verify("myid = order (load 'file:" + tmpFile + "') BY *;", false); + } + + @Test + public void testTopLevelOrderBy_Col1_NoUsing() throws Exception { + verify("myid = order (load 'file:" + tmpFile + "') BY $1;", false); + } + + @Test + public void testTopLevelOrderBy_Col2_NoUsing() throws Exception { + verify("myid = order (load 'file:" + tmpFile + "') BY $2;", true); + } + + @Test + public void testTopLevelOrderBy_Col21_NoUsing() throws Exception { + verify("myid = order (load 'file:" + tmpFile + "') BY $2, $1;", true); + } + + @Test + public void testTopLevelOrderBy_Star_Using() throws Exception { + verify("myid = order (load 'file:" + tmpFile + + "') BY * USING org.apache.pig.test.OrdAsc;", false); + verify("myid = order (load 'file:" + tmpFile + + "') BY * USING org.apache.pig.test.OrdDesc;", true); + verify("myid = order (load 'file:" + tmpFile + + "') BY * USING org.apache.pig.test.OrdDescNumeric;", true); + } + + @Test + public void testTopLevelOrderBy_Col1_Using() throws Exception { + verify("myid = order (load 'file:" + tmpFile + + "') BY $1 USING org.apache.pig.test.OrdAsc;", false); + verify("myid = order (load 'file:" + tmpFile + + "') BY $1 USING org.apache.pig.test.OrdDesc;", true); + verify("myid = order (load 'file:" + tmpFile + + "') BY $1 USING org.apache.pig.test.OrdDescNumeric;", true); + } + + @Test + public void testTopLevelOrderBy_Col2_Using() throws Exception { + verify("myid = order (load 'file:" + tmpFile + + "') BY $2 USING org.apache.pig.test.OrdAsc;", true); + verify("myid = order (load 'file:" + tmpFile + + "') BY $2 USING org.apache.pig.test.OrdDesc;", false); + verify("myid = order (load 'file:" + tmpFile + + "') BY $2 USING org.apache.pig.test.OrdDescNumeric;", false); + } + + @Test + public void testTopLevelOrderBy_Col21_Using() throws Exception { + // col2/col1 ascending - + verify("myid = order (load 'file:" + tmpFile + + "') BY $2, $1 USING org.apache.pig.test.OrdAsc;", true); + verify("myid = order (load 'file:" + tmpFile + + "') BY $2, $1 USING org.apache.pig.test.OrdDesc;", false); + verify("myid = order (load 'file:" + tmpFile + + "') BY $2, $1 USING org.apache.pig.test.OrdDescNumeric;", false); + } + + @Test + public void testNestedOrderBy_Star_NoUsing() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY *; generate flatten(D); };", false); + } + + @Test + public void testNestedOrderBy_Col1_NoUsing() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $1; generate flatten(D); };", false); + } + + @Test + public void testNestedOrderBy_Col2_NoUsing() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2; generate flatten(D); };", true); + } + + @Test + public void testNestedOrderBy_Col21_NoUsing() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2, $1; generate flatten(D); };", true); + } + + @Test + public void testNestedOrderBy_Star_Using() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY * USING " + + "org.apache.pig.test.OrdAsc; generate flatten(D); };", false); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY * USING " + + "org.apache.pig.test.OrdDesc; generate flatten(D); };", true); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY * USING " + + "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", true); + } + + @Test + public void testNestedOrderBy_Col1_Using() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $1 USING " + + "org.apache.pig.test.OrdAsc; generate flatten(D); };", false); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $1 USING " + + "org.apache.pig.test.OrdDesc; generate flatten(D); };", true); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $1 USING " + + "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", + true); + } + + @Test + public void testNestedOrderBy_Col2_Using() throws Exception { + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2 USING " + + "org.apache.pig.test.OrdAsc; generate flatten(D); };", true); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2 USING " + + "org.apache.pig.test.OrdDesc; generate flatten(D); };", false); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2 USING " + + "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", + false); + } + + @Test + public void testNestedOrderBy_Col21_Using() throws Exception { + // col2/col1 ascending - + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2, $1 USING " + + "org.apache.pig.test.OrdAsc; generate flatten(D); };", true); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2, $1 USING " + + "org.apache.pig.test.OrdDesc; generate flatten(D); };", false); + verify("myid = foreach (group (load 'file:" + tmpFile + + "') by $0) { D = ORDER $1 BY $2, $1 USING " + + "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", + false); + } + +}