Author: pradeepkth
Date: Thu Feb 12 22:53:53 2009
New Revision: 743915

URL: http://svn.apache.org/viewvc?rev=743915&view=rev
Log:
PIG-665: Map key type not correctly set (for use when key is null) when map 
plan does not have localrearrange

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 12 22:53:53 2009
@@ -416,3 +416,6 @@
 
     PIG-574: allowing to run scripts from within grunt shell (hagleitn via
     olgan)
+
+    PIG-665: Map key type not correctly set (for use when key is null) when
+    map plan does not have localrearrange (pradeepkth)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
 Thu Feb 12 22:53:53 2009
@@ -20,15 +20,18 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -51,32 +54,66 @@
     
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
+        boolean foundKeyType = false;
         PhyPlanKeyTypeVisitor kvisitor = new PhyPlanKeyTypeVisitor(mr.mapPlan, 
mr);
         kvisitor.visit();
+        if(!kvisitor.foundKeyType) {
+            // look for the key type from a POLocalRearrange in the previous 
reduce
+            List<MapReduceOper> preds = mPlan.getPredecessors(mr);
+            // if there are no predecessors, then we probably are in a 
+            // simple load-store script - there is no way to figure out
+            // the key type in this case which probably means we don't need
+            // to figure it out
+            if(preds != null) {
+                Map<Byte, Integer> seen = new HashMap<Byte, Integer>();
+                for (MapReduceOper pred : preds) {
+                    PhyPlanKeyTypeVisitor visitor = new 
PhyPlanKeyTypeVisitor(pred.reducePlan, mr);
+                    visitor.visit();
+                    foundKeyType |= visitor.foundKeyType;
+                    byte type = mr.mapKeyType;
+                    seen.put(type, 1);
+                }
+                if(seen.size() > 1) {
+                    // throw exception since we should get the same key type 
from all predecessors
+                    int errorCode = 2119;
+                    String message = "Internal Error: Found multiple data 
types for map key";
+                    throw new VisitorException(message, errorCode, 
PigException.BUG);
+                }
+                // if we were not able to find the key and 
+                // if there is a map and reduce phase, then the
+                // map would need to send a valid key object and this
+                // can be an issue when the key is null - so error out here!
+                // if the reduce phase is empty, then this is a map only job
+                // which will not need a key object -
+                // IMPORTANT NOTE: THIS RELIES ON THE FACT THAT CURRENTLY
+                // IN PigMapOnly.collect(), null IS SENT IN THE collect() CALL
+                // FOR THE KEY - IF THAT CHANGES, THEN THIS CODE HERE MAY NEED 
TO
+                // CHANGE!
+                if(!foundKeyType && !mr.reducePlan.isEmpty()) {
+                    // throw exception since we were not able to determine key 
type!
+                    int errorCode = 2120;
+                    String message = "Internal Error: Unable to determine data 
type for map key";
+                    throw new VisitorException(message, errorCode, 
PigException.BUG);
+                }
+            }
+        }
     }
     
     class PhyPlanKeyTypeVisitor extends PhyPlanVisitor {
         
         private MapReduceOper mro;
+        private boolean foundKeyType = false;
         
         public PhyPlanKeyTypeVisitor(PhysicalPlan plan, MapReduceOper mro) {
             super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
             this.mro = mro;
         }
         
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage)
-         */
-        @Override
-        public void visitPackage(POPackage pkg) throws VisitorException {
-            this.mro.mapKeyType = pkg.getKeyType();        
-        }
-    
-    
         @Override
         public void visitLocalRearrange(POLocalRearrange lr)
                 throws VisitorException {
-            this.mro.mapKeyType = lr.getKeyType();        
+            this.mro.mapKeyType = lr.getKeyType();
+            foundKeyType = true;
         }
     }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
 Thu Feb 12 22:53:53 2009
@@ -85,7 +85,12 @@
 
     @Override
     public String name() {
-        return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) 
+ "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+        return "POCombinerPackage" + "[" + DataType.findTypeName(resultType) + 
"]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+    }
+    
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCombinerPackage(this);
     }
     
     /**

Added: 
hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java?rev=743915&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java 
Thu Feb 12 22:53:53 2009
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * This testcases here test that the key type of the map key
+ * is correctly determines for use when the key is null. In 
+ * particular it tests KeyTypeDiscoveryVisitor
+ */
+public class TestKeyTypeDiscoveryVisitor extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+    
+    @Before
+    public void setUp() throws Exception{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @Test
+    public void testNullJoin() throws Exception {
+        String[] inputData = new String[] { "\t7\t8", "\t8\t9", "1\t20\t30", 
"1\t20\t40" };
+        Util.createInputFile(cluster, "a.txt", inputData);
+        
+        inputData = new String[] { "7\t2", "1\t5", "1\t10" };
+        Util.createInputFile(cluster, "b.txt", inputData);
+        
+        String script = "a = load 'a.txt' as (x:int, y:int, z:int);" +
+                       "b = load 'b.txt' as (x:int, y:int);" +
+                       "b_group = group b by x;" +
+                       "b_sum = foreach b_group generate flatten(group) as x, 
SUM(b.y) as clicks;" +
+                       // b_sum will have {(1, 15L)}
+                       "a_group = group a by (x, y);" +
+                       "a_aggs = foreach a_group generate flatten(group) as 
(x, y), SUM(a.z) as zs;" +
+                       // a_aggs will have {(<null>, 7, 8L), (<null>, 8, 9L), 
(1, 20, 70L)
+                       // The join in the next statement is on "x" which is 
the first column
+                       // The nulls in the first two records of a_aggs will 
test whether
+                       // KeyTypeDiscoveryVisitor had set a valid keyType (in 
this case INTEGER)
+                       // The null records will get discarded by the join and 
hence the join
+                       // output would be {(1, 15L, 1, 20, 70L)} 
+                       "join_a_b = join b_sum by x, a_aggs by x;";
+        Util.registerQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("join_a_b");
+        Object[] results = new Object[] { 1, 15L, 1, 20, 70L};
+        Tuple output = it.next();
+        assertFalse(it.hasNext());
+        assertEquals(results.length, output.size());
+        for (int i = 0; i < output.size(); i++) {
+            assertEquals(results[i], output.get(i));
+        }
+            
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Thu Feb 12 22:53:53 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -333,4 +334,11 @@
         comp.compile();
         return comp.getMRPlan();       
     }
+    
+    public static void registerQuery(PigServer pigServer, String query) throws 
IOException {
+        String[] queryLines = query.split(";");
+        for (String line : queryLines) {
+            pigServer.registerQuery(line + ";");
+        }
+    }
 }


Reply via email to