Author: gates Date: Thu Sep 25 21:17:28 2008 New Revision: 699170 URL: http://svn.apache.org/viewvc?rev=699170&view=rev Log: PIG-461 Changed additional last limit to use sort comparator when the limit follows an order by.
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=699170&r1=699169&r2=699170&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Sep 25 21:17:28 2008 @@ -366,9 +366,13 @@ jobConf.setOutputValueClass(NullableTuple.class); } - if(mro.isGlobalSort()){ - jobConf.set("pig.quantilesFile", mro.getQuantFile()); - jobConf.setPartitionerClass(SortPartitioner.class); + if(mro.isGlobalSort() || mro.isLimitAfterSort()){ + // Only set the quantiles file and sort partitioner if we're a + // global sort, not for limit after sort. + if (mro.isGlobalSort()) { + jobConf.set("pig.quantilesFile", mro.getQuantFile()); + jobConf.setPartitionerClass(SortPartitioner.class); + } if(mro.UDFs.size()==1){ String compFuncSpec = mro.UDFs.get(0); Class comparator = PigContext.resolveClassName(compFuncSpec); @@ -469,9 +473,10 @@ // raw comparator. // An operator has an order by if global sort is set or if it's successor has - // global sort set (because in that case it's the sampling job). + // global sort set (because in that case it's the sampling job) or if + // it's a limit after a sort. boolean hasOrderBy = false; - if (mro.isGlobalSort()) { + if (mro.isGlobalSort() || mro.isLimitAfterSort()) { hasOrderBy = true; } else { List<MapReduceOper> succs = plan.getSuccessors(mro); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=699170&r1=699169&r2=699170&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Sep 25 21:17:28 2008 @@ -1302,6 +1302,11 @@ st.setSFile(oldSpec); limitAdjustMROp.reducePlan.addAsLeaf(st); limitAdjustMROp.requestedParallelism = -1; + // If the operator we're following has global sort set, we + // need to indicate that this is a limit after a sort. + // This will assure that we get the right sort comparator + // set. Otherwise our order gets wacked (PIG-461). + if (mr.isGlobalSort()) limitAdjustMROp.setLimitAfterSort(true); List<MapReduceOper> successorList = MRPlan.getSuccessors(mr); MapReduceOper successors[] = null; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=699170&r1=699169&r2=699170&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Sep 25 21:17:28 2008 @@ -77,6 +77,9 @@ //Indicates if this job is an order by job boolean globalSort = false; + // Indicates if this is a limit after a sort + boolean limitAfterSort = false; + // If true, putting an identity combine in this // mapreduce job will speed things up. boolean needsDistinctCombiner = false; @@ -227,6 +230,14 @@ this.globalSort = globalSort; } + public boolean isLimitAfterSort() { + return limitAfterSort; + } + + public void setLimitAfterSort(boolean las) { + limitAfterSort = las; + } + public boolean needsDistinctCombiner() { return needsDistinctCombiner; }