Author: gates
Date: Thu Sep 25 21:17:28 2008
New Revision: 699170

URL: http://svn.apache.org/viewvc?rev=699170&view=rev
Log:
PIG-461 Changed additional last limit to use sort comparator when the limit 
follows an order by.


Modified:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=699170&r1=699169&r2=699170&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Thu Sep 25 21:17:28 2008
@@ -366,9 +366,13 @@
                 jobConf.setOutputValueClass(NullableTuple.class);
             }
         
-            if(mro.isGlobalSort()){
-                jobConf.set("pig.quantilesFile", mro.getQuantFile());
-                jobConf.setPartitionerClass(SortPartitioner.class);
+            if(mro.isGlobalSort() || mro.isLimitAfterSort()){
+                // Only set the quantiles file and sort partitioner if we're a
+                // global sort, not for limit after sort.
+                if (mro.isGlobalSort()) {
+                    jobConf.set("pig.quantilesFile", mro.getQuantFile());
+                    jobConf.setPartitionerClass(SortPartitioner.class);
+                }
                 if(mro.UDFs.size()==1){
                     String compFuncSpec = mro.UDFs.get(0);
                     Class comparator = 
PigContext.resolveClassName(compFuncSpec);
@@ -469,9 +473,10 @@
         // raw comparator.
         
         // An operator has an order by if global sort is set or if it's 
successor has
-        // global sort set (because in that case it's the sampling job). 
+        // global sort set (because in that case it's the sampling job) or if
+        // it's a limit after a sort. 
         boolean hasOrderBy = false;
-        if (mro.isGlobalSort()) {
+        if (mro.isGlobalSort() || mro.isLimitAfterSort()) {
             hasOrderBy = true;
         } else {
             List<MapReduceOper> succs = plan.getSuccessors(mro);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=699170&r1=699169&r2=699170&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Sep 25 21:17:28 2008
@@ -1302,6 +1302,11 @@
                 st.setSFile(oldSpec);
                 limitAdjustMROp.reducePlan.addAsLeaf(st);
                 limitAdjustMROp.requestedParallelism = -1;
+                // If the operator we're following has global sort set, we
+                // need to indicate that this is a limit after a sort.
+                // This will assure that we get the right sort comparator
+                // set.  Otherwise our order gets wacked (PIG-461).
+                if (mr.isGlobalSort()) limitAdjustMROp.setLimitAfterSort(true);
                 
                 List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
                 MapReduceOper successors[] = null;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=699170&r1=699169&r2=699170&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Thu Sep 25 21:17:28 2008
@@ -77,6 +77,9 @@
     //Indicates if this job is an order by job
     boolean globalSort = false;
 
+    // Indicates if this is a limit after a sort
+    boolean limitAfterSort = false;
+
     // If true, putting an identity combine in this
     // mapreduce job will speed things up.
     boolean needsDistinctCombiner = false;
@@ -227,6 +230,14 @@
         this.globalSort = globalSort;
     }
 
+    public boolean isLimitAfterSort() {
+        return limitAfterSort;
+    }
+
+    public void setLimitAfterSort(boolean las) {
+        limitAfterSort = las;
+    }
+
     public boolean needsDistinctCombiner() { 
         return needsDistinctCombiner;
     }


Reply via email to