Author: pradeepkth Date: Thu Feb 12 22:53:53 2009 New Revision: 743915 URL: http://svn.apache.org/viewvc?rev=743915&view=rev Log: PIG-665: Map key type not correctly set (for use when key is null) when map plan does not have localrearrange
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java hadoop/pig/trunk/test/org/apache/pig/test/Util.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743915&r1=743914&r2=743915&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Feb 12 22:53:53 2009 @@ -416,3 +416,6 @@ PIG-574: allowing to run scripts from within grunt shell (hagleitn via olgan) + + PIG-665: Map key type not correctly set (for use when key is null) when + map plan does not have localrearrange (pradeepkth) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java?rev=743915&r1=743914&r2=743915&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009 @@ -20,15 +20,18 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.pig.PigException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.impl.plan.DepthFirstWalker; -import org.apache.pig.impl.plan.PlanWalker; import org.apache.pig.impl.plan.VisitorException; /** @@ -51,32 +54,66 @@ @Override public void visitMROp(MapReduceOper mr) throws VisitorException { + boolean foundKeyType = false; PhyPlanKeyTypeVisitor kvisitor = new PhyPlanKeyTypeVisitor(mr.mapPlan, mr); kvisitor.visit(); + if(!kvisitor.foundKeyType) { + // look for the key type from a POLocalRearrange in the previous reduce + List<MapReduceOper> preds = mPlan.getPredecessors(mr); + // if there are no predecessors, then we probably are in a + // simple load-store script - there is no way to figure out + // the key type in this case which probably means we don't need + // to figure it out + if(preds != null) { + Map<Byte, Integer> seen = new HashMap<Byte, Integer>(); + for (MapReduceOper pred : preds) { + PhyPlanKeyTypeVisitor visitor = new PhyPlanKeyTypeVisitor(pred.reducePlan, mr); + visitor.visit(); + foundKeyType |= visitor.foundKeyType; + byte type = mr.mapKeyType; + seen.put(type, 1); + } + if(seen.size() > 1) { + // throw exception since we should get the same key type from all predecessors + int errorCode = 2119; + String message = "Internal Error: Found multiple data types for map key"; + throw new VisitorException(message, errorCode, PigException.BUG); + } + // if we were not able to find the key and + // if there is a map and reduce phase, then the + // map would need to send a valid key object and this + // can be an issue when the key is null - so error out here! + // if the reduce phase is empty, then this is a map only job + // which will not need a key object - + // IMPORTANT NOTE: THIS RELIES ON THE FACT THAT CURRENTLY + // IN PigMapOnly.collect(), null IS SENT IN THE collect() CALL + // FOR THE KEY - IF THAT CHANGES, THEN THIS CODE HERE MAY NEED TO + // CHANGE! + if(!foundKeyType && !mr.reducePlan.isEmpty()) { + // throw exception since we were not able to determine key type! + int errorCode = 2120; + String message = "Internal Error: Unable to determine data type for map key"; + throw new VisitorException(message, errorCode, PigException.BUG); + } + } + } } class PhyPlanKeyTypeVisitor extends PhyPlanVisitor { private MapReduceOper mro; + private boolean foundKeyType = false; public PhyPlanKeyTypeVisitor(PhysicalPlan plan, MapReduceOper mro) { super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); this.mro = mro; } - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage) - */ - @Override - public void visitPackage(POPackage pkg) throws VisitorException { - this.mro.mapKeyType = pkg.getKeyType(); - } - - @Override public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException { - this.mro.mapKeyType = lr.getKeyType(); + this.mro.mapKeyType = lr.getKeyType(); + foundKeyType = true; } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=743915&r1=743914&r2=743915&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Thu Feb 12 22:53:53 2009 @@ -85,7 +85,12 @@ @Override public String name() { - return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString(); + return "POCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString(); + } + + @Override + public void visit(PhyPlanVisitor v) throws VisitorException { + v.visitCombinerPackage(this); } /** Added: hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java?rev=743915&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009 @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.FileLocalizer; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.TestCase; + +/** + * This testcases here test that the key type of the map key + * is correctly determines for use when the key is null. In + * particular it tests KeyTypeDiscoveryVisitor + */ +public class TestKeyTypeDiscoveryVisitor extends TestCase { + + MiniCluster cluster = MiniCluster.buildCluster(); + private PigServer pigServer; + + TupleFactory mTf = TupleFactory.getInstance(); + BagFactory mBf = BagFactory.getInstance(); + + @Before + public void setUp() throws Exception{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + @Test + public void testNullJoin() throws Exception { + String[] inputData = new String[] { "\t7\t8", "\t8\t9", "1\t20\t30", "1\t20\t40" }; + Util.createInputFile(cluster, "a.txt", inputData); + + inputData = new String[] { "7\t2", "1\t5", "1\t10" }; + Util.createInputFile(cluster, "b.txt", inputData); + + String script = "a = load 'a.txt' as (x:int, y:int, z:int);" + + "b = load 'b.txt' as (x:int, y:int);" + + "b_group = group b by x;" + + "b_sum = foreach b_group generate flatten(group) as x, SUM(b.y) as clicks;" + + // b_sum will have {(1, 15L)} + "a_group = group a by (x, y);" + + "a_aggs = foreach a_group generate flatten(group) as (x, y), SUM(a.z) as zs;" + + // a_aggs will have {(<null>, 7, 8L), (<null>, 8, 9L), (1, 20, 70L) + // The join in the next statement is on "x" which is the first column + // The nulls in the first two records of a_aggs will test whether + // KeyTypeDiscoveryVisitor had set a valid keyType (in this case INTEGER) + // The null records will get discarded by the join and hence the join + // output would be {(1, 15L, 1, 20, 70L)} + "join_a_b = join b_sum by x, a_aggs by x;"; + Util.registerQuery(pigServer, script); + Iterator<Tuple> it = pigServer.openIterator("join_a_b"); + Object[] results = new Object[] { 1, 15L, 1, 20, 70L}; + Tuple output = it.next(); + assertFalse(it.hasNext()); + assertEquals(results.length, output.size()); + for (int i = 0; i < output.size(); i++) { + assertEquals(results[i], output.get(i)); + } + + } + +} Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=743915&r1=743914&r2=743915&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Thu Feb 12 22:53:53 2009 @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; @@ -333,4 +334,11 @@ comp.compile(); return comp.getMRPlan(); } + + public static void registerQuery(PigServer pigServer, String query) throws IOException { + String[] queryLines = query.split(";"); + for (String line : queryLines) { + pigServer.registerQuery(line + ";"); + } + } }