Author: rding Date: Wed Mar 10 17:54:50 2010 New Revision: 921483 URL: http://svn.apache.org/viewvc?rev=921483&view=rev Log: PIG-1238: Dump does not respect the schema
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=921483&r1=921482&r2=921483&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Mar 10 17:54:50 2010 @@ -145,6 +145,8 @@ OPTIMIZATIONS BUG FIXES +PIG-1238: Dump does not respect the schema (rding) + PIG-1261: PigStorageSchema broke after changes to ResourceSchema (dvryaboy via daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=921483&r1=921482&r2=921483&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Mar 10 17:54:50 2010 @@ -38,22 +38,11 @@ import org.apache.pig.LoadFunc; import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigException; import org.apache.pig.PigWarning; -import org.apache.pig.builtin.BinStorage; -import org.apache.pig.data.DataType; -import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.builtin.DefaultIndexableLoader; -import org.apache.pig.impl.builtin.FindQuantiles; -import org.apache.pig.impl.builtin.PoissonSampleLoader; -import org.apache.pig.impl.builtin.GetMemNumRows; -import org.apache.pig.impl.builtin.PartitionSkewedKeys; -import org.apache.pig.impl.builtin.RandomSampleLoader; -import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.impl.io.FileSpec; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; @@ -61,26 +50,38 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.builtin.BinStorage; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.DefaultIndexableLoader; +import org.apache.pig.impl.builtin.FindQuantiles; +import org.apache.pig.impl.builtin.GetMemNumRows; +import org.apache.pig.impl.builtin.PartitionSkewedKeys; +import org.apache.pig.impl.builtin.PoissonSampleLoader; +import org.apache.pig.impl.builtin.RandomSampleLoader; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -94,7 +95,6 @@ import org.apache.pig.impl.util.Compiler import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; /** * The compiler that compiles a given physical plan @@ -2414,7 +2414,7 @@ public class MRCompiler extends PhyPlanV { for (MapReduceOper mr:opsToAdjust) { - if (mr.reducePlan.isEmpty()) return; + if (mr.reducePlan.isEmpty()) continue; List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves(); if (mpLeaves.size() != 1) { int errCode = 2024; @@ -2442,28 +2442,33 @@ public class MRCompiler extends PhyPlanV POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope))); pLimit.setLimit(mr.limit); limitAdjustMROp.mapPlan.addAsLeaf(pLimit); - if (mr.isGlobalSort()) + if (mr.isGlobalSort()) { connectMapToReduceLimitedSort(limitAdjustMROp, mr); - else + } else { simpleConnectMapToReduce(limitAdjustMROp); + } POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope))); pLimit2.setLimit(mr.limit); limitAdjustMROp.reducePlan.addAsLeaf(pLimit2); - POStore st = getStore(); - st.setSFile(oldSpec); - st.setIsTmpStore(oldIsTmpStore); - limitAdjustMROp.reducePlan.addAsLeaf(st); - limitAdjustMROp.requestedParallelism = 1; - limitAdjustMROp.setLimitOnly(true); + // If the operator we're following has global sort set, we // need to indicate that this is a limit after a sort. // This will assure that we get the right sort comparator // set. Otherwise our order gets wacked (PIG-461). if (mr.isGlobalSort()) { + fixProjectionAfterLimit(limitAdjustMROp, mr); limitAdjustMROp.setLimitAfterSort(true); limitAdjustMROp.setSortOrder(mr.getSortOrder()); } + + POStore st = getStore(); + st.setSFile(oldSpec); + st.setIsTmpStore(oldIsTmpStore); + limitAdjustMROp.reducePlan.addAsLeaf(st); + limitAdjustMROp.requestedParallelism = 1; + limitAdjustMROp.setLimitOnly(true); + List<MapReduceOper> successorList = MRPlan.getSuccessors(mr); MapReduceOper successors[] = null; @@ -2494,6 +2499,32 @@ public class MRCompiler extends PhyPlanV } } } + + // Move all operators between POLimit and POStore in reducer plan + // from sortMROp to the new MROp so that the sort keys aren't lost by + // projection in sortMROp. + private void fixProjectionAfterLimit(MapReduceOper mro, + MapReduceOper sortMROp) throws PlanException, VisitorException { + + PhysicalOperator op = sortMROp.reducePlan.getLeaves().get(0); + + while (true) { + List<PhysicalOperator> preds = sortMROp.reducePlan + .getPredecessors(op); + op = preds.get(0); + if (op instanceof POLimit) break; + } + + while (true) { + List<PhysicalOperator> succes = sortMROp.reducePlan + .getSuccessors(op); + PhysicalOperator succ = succes.get(0); + if (succ instanceof POStore) break; + + sortMROp.reducePlan.removeAndReconnect(succ); + mro.reducePlan.addAsLeaf(succ); + } + } } private static class FindKeyTypeVisitor extends PhyPlanVisitor { Added: hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java?rev=921483&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java Wed Mar 10 17:54:50 2010 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.*; + +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.List; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestLimitAdjuster { + + private static final MiniCluster cluster = MiniCluster.buildCluster(); + + private PigServer pig; + + @Before + public void setUp() throws Exception { + pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void simpleTest() throws Exception { + String INPUT_FILE = "input"; + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("1\torange"); + w.println("2\tapple"); + w.println("3\tcoconut"); + w.println("4\tmango"); + w.println("5\tgrape"); + w.println("6\tpear"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + pig.registerQuery("a = load '" + INPUT_FILE + "' as (x:int, y:chararray);"); + pig.registerQuery("b = order a by x parallel 2;"); + pig.registerQuery("c = limit b 1;"); + pig.registerQuery("d = foreach c generate y;"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { "('orange')" }); + + Iterator<Tuple> iter = pig.openIterator("d"); + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + assertEquals(expectedResults.size(), counter); + } + +}