Author: gates
Date: Thu Sep 25 21:17:28 2008
New Revision: 699170
URL: http://svn.apache.org/viewvc?rev=699170view=rev
Log:
PIG-461 Changed additional last limit to use sort comparator when the limit
follows an order by.
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=699170r1=699169r2=699170view=diff
==
---
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Sep 25 21:17:28 2008
@@ -366,9 +366,13 @@
jobConf.setOutputValueClass(NullableTuple.class);
}
-if(mro.isGlobalSort()){
-jobConf.set(pig.quantilesFile, mro.getQuantFile());
-jobConf.setPartitionerClass(SortPartitioner.class);
+if(mro.isGlobalSort() || mro.isLimitAfterSort()){
+// Only set the quantiles file and sort partitioner if we're a
+// global sort, not for limit after sort.
+if (mro.isGlobalSort()) {
+jobConf.set(pig.quantilesFile, mro.getQuantFile());
+jobConf.setPartitionerClass(SortPartitioner.class);
+}
if(mro.UDFs.size()==1){
String compFuncSpec = mro.UDFs.get(0);
Class comparator =
PigContext.resolveClassName(compFuncSpec);
@@ -469,9 +473,10 @@
// raw comparator.
// An operator has an order by if global sort is set or if it's
successor has
-// global sort set (because in that case it's the sampling job).
+// global sort set (because in that case it's the sampling job) or if
+// it's a limit after a sort.
boolean hasOrderBy = false;
-if (mro.isGlobalSort()) {
+if (mro.isGlobalSort() || mro.isLimitAfterSort()) {
hasOrderBy = true;
} else {
ListMapReduceOper succs = plan.getSuccessors(mro);
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=699170r1=699169r2=699170view=diff
==
---
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Sep 25 21:17:28 2008
@@ -1302,6 +1302,11 @@
st.setSFile(oldSpec);
limitAdjustMROp.reducePlan.addAsLeaf(st);
limitAdjustMROp.requestedParallelism = -1;
+// If the operator we're following has global sort set, we
+// need to indicate that this is a limit after a sort.
+// This will assure that we get the right sort comparator
+// set. Otherwise our order gets wacked (PIG-461).
+if (mr.isGlobalSort()) limitAdjustMROp.setLimitAfterSort(true);
ListMapReduceOper successorList = MRPlan.getSuccessors(mr);
MapReduceOper successors[] = null;
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL:
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=699170r1=699169r2=699170view=diff
==
---
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Thu Sep 25 21:17:28 2008
@@ -77,6 +77,9 @@
//Indicates if this job is an order by job
boolean globalSort = false;
+// Indicates if this is a limit after a sort
+boolean limitAfterSort = false;
+
// If true, putting an identity combine