Author: gates
Date: Thu Dec  3 22:56:45 2009
New Revision: 886973

URL: http://svn.apache.org/viewvc?rev=886973&view=rev
Log:
PIG-1068:  COGROUP fails with 'Type mismatch in key from map: expected 
org.apache.pig.impl.io.NullableText, recieved 
org.apache.pig.impl.io.NullableTuple'

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec  3 22:56:45 2009
@@ -45,6 +45,10 @@
 
 BUG FIXES
 
+PIG-1068:  COGROUP fails with 'Type mismatch in key from map: expected
+                  org.apache.pig.impl.io.NullableText, recieved
+                  org.apache.pig.impl.io.NullableTuple' (rding via gates)
+
 PIG-1113: Diamond query optimization throws error in JOIN (rding via olgan)
 
 PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Thu Dec  3 22:56:45 2009
@@ -662,9 +662,10 @@
                 pkg.addPackage(p);
                 pkCount++;
             }
+            
pkg.addIsKeyWrappedList(((POMultiQueryPackage)pk).getIsKeyWrappedList());
             addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
         } else {
-            pkg.addPackage(pk);
+            pkg.addPackage(pk, mapKeyType);
             pkCount = 1;
         }
         
@@ -673,8 +674,6 @@
             String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
-
-        boolean[] keyPos = pk.getKeyPositionsInTuple();
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
         int plCount = 0;
@@ -685,12 +684,11 @@
             // operator, then it's the only operator in the plan.
             List<PhysicalPlan> pls = ((PODemux)root).getPlans();
             for (PhysicalPlan pl : pls) {
-                demux.addPlan(pl, keyPos);
+                demux.addPlan(pl);
                 plCount++;
             }
-            demux.addIsKeyWrappedList(((PODemux)root).getIsKeyWrappedList());
         } else {
-            demux.addPlan(from, mapKeyType, keyPos);
+            demux.addPlan(from);
             plCount = 1;
         }
         
@@ -700,11 +698,11 @@
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
 
-        if (demux.isSameMapKeyType()) {
+        if (pkg.isSameMapKeyType()) {
             pkg.setKeyType(pk.getKeyType());
         } else {
             pkg.setKeyType(DataType.TUPLE);
-        }                
+        }            
     }
     
     private void addShiftedKeyInfoIndex(int index, POPackage pkg) throws 
OptimizerException {
@@ -785,11 +783,11 @@
         from.remove(cpk);
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
-        
-        boolean isSameKeyType = demux.isSameMapKeyType();
-        
+                
         POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
         
+        boolean isSameKeyType = pkg.isSameMapKeyType();
+        
         // if current > initial + 1, it means we had
         // a split in the map of the MROper we are trying to
         // merge. In that case we would have changed the indices
@@ -818,6 +816,8 @@
             pkCount = 1;
         }
 
+        pkg.setSameMapKeyType(isSameKeyType);
+        
         if (pkCount != total) {
             int errCode = 2146;
             String msg = "Internal Error. Inconsistency in key index found 
during optimization.";
@@ -831,8 +831,6 @@
         
         pkg.setKeyType(cpk.getKeyType());
         
-        boolean[] keyPos = cpk.getKeyPositionsInTuple();
-        
         // See comment above for why we flatten the Packages
         // in the from plan - for the same reason, we flatten
         // the inner plans of Demux operator now.
@@ -841,7 +839,7 @@
         if (leaf instanceof PODemux) {
             List<PhysicalPlan> pls = ((PODemux)leaf).getPlans();
             for (PhysicalPlan pl : pls) {
-                demux.addPlan(pl, mapKeyType, keyPos);
+                demux.addPlan(pl);
                 POLocalRearrange lr = (POLocalRearrange)pl.getLeaves().get(0);
                 try {
                     lr.setMultiQueryIndex(initial + plCount++);            
@@ -858,7 +856,7 @@
                 }
             }
         } else {
-            demux.addPlan(from, mapKeyType, keyPos);
+            demux.addPlan(from);
             POLocalRearrange lr = (POLocalRearrange)from.getLeaves().get(0);
             try {
                 lr.setMultiQueryIndex(initial + plCount++);            
@@ -895,8 +893,8 @@
        
     private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean 
isCombiner) 
         throws VisitorException {
-        PODemux demux = getDemux(sameKeyType, isCombiner);
-        POMultiQueryPackage pkg= getMultiQueryPackage();
+        PODemux demux = getDemux(isCombiner);
+        POMultiQueryPackage pkg= getMultiQueryPackage(sameKeyType, isCombiner);
         
         PhysicalPlan pl = new PhysicalPlan();
         pl.add(pkg);
@@ -1135,14 +1133,17 @@
         return new POStore(new OperatorKey(scope, nig.getNextNodeId(scope)));
     } 
      
-    private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
+    private PODemux getDemux(boolean inCombiner){
         PODemux demux = new PODemux(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
-        demux.setSameMapKeyType(sameMapKeyType);
         demux.setInCombiner(inCombiner);
         return demux;
     } 
     
-    private POMultiQueryPackage getMultiQueryPackage(){
-        return new POMultiQueryPackage(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+    private POMultiQueryPackage getMultiQueryPackage(boolean sameMapKeyType, 
boolean inCombiner){
+        POMultiQueryPackage pkg =  
+            new POMultiQueryPackage(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+        pkg.setInCombiner(inCombiner);
+        pkg.setSameMapKeyType(sameMapKeyType);
+        return pkg;
     }   
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
 Thu Dec  3 22:56:45 2009
@@ -68,26 +68,6 @@
      */
     private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
     
-    /**
-     * If the POLocalRearranges corresponding to the reduce plans in 
-     * myPlans (the list of inner plans of the demux) have different key types
-     * then the MultiQueryOptimizer converts all the keys to be of type tuple
-     * by wrapping any non-tuple keys into Tuples (keys which are already 
tuples
-     * are left alone).
-     * The list below is a list of booleans indicating whether extra tuple 
wrapping
-     * was done for the key in the corresponding POLocalRearranges and if we 
need
-     * to "unwrap" the tuple to get to the key
-     */
-    private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
-    
-    /**
-     * The list tracks the field position of the key in the input tuple so that
-     * the right values are "unwrapped" to get the key. 
-     * The tuples emitted from POCombinerPackages always have keys in a fixed 
-     * position, but this position varies depending on the Pig Latin scripts.
-     */
-    private ArrayList<boolean[]> keyPositions = new ArrayList<boolean[]>();
-    
     /*
      * Flag indicating when a new pull should start 
      */
@@ -106,14 +86,6 @@
     private PhysicalOperator curLeaf = null;
     
     /*
-     * Indicating if all the inner plans have the same
-     * map key type. If not, the keys passed in are 
-     * wrapped inside tuples and need to be extracted
-     * out during the reduce phase 
-     */
-    private boolean sameMapKeyType = true;
-    
-    /*
      * Indicating if this operator is in a combiner. 
      * If not, this operator is in a reducer and the key
      * values must first be extracted from the tuple-wrap
@@ -172,7 +144,7 @@
 
     @Override
     public String name() {
-        return "Demux" + isKeyWrapped + " - " + mKey.toString();
+        return "Demux [" + myPlans.size() + "] "+ mKey.toString();
     }
 
     @Override
@@ -195,45 +167,14 @@
     }
     
     /**
-     * Returns the list of booleans that indicates if the 
-     * key needs to unwrapped for the corresponding plan.
-     * 
-     * @return the list of isKeyWrapped boolean values
-     */
-    public List<Boolean> getIsKeyWrappedList() {
-        return Collections.unmodifiableList(isKeyWrapped);
-    }
-    
-    /**
-     * Adds a list of IsKeyWrapped boolean values
-     * 
-     * @param lst the list of boolean values to add
-     */
-    public void addIsKeyWrappedList(List<Boolean> lst) {
-        for (Boolean b : lst) {
-            isKeyWrapped.add(b);
-        }
-    }
-    
-    /**
      * Appends the specified plan at the end of the list.
      * 
      * @param inPlan plan to be appended to the inner plan list
      */
-    public void addPlan(PhysicalPlan inPlan, byte mapKeyType, boolean[] 
keyPos) {  
-        myPlans.add(inPlan);
-        processedSet.set(myPlans.size()-1);
-        // if mapKeyType is already a tuple, we will NOT
-        // be wrapping it in an extra tuple. If it is not
-        // a tuple, we will wrap into in a tuple
-        isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
-        keyPositions.add(keyPos);
-    }
     
-    public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {  
+    public void addPlan(PhysicalPlan inPlan) {  
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
-        keyPositions.add(keyPos);
     }
    
     @Override
@@ -364,51 +305,12 @@
         
         PhysicalPlan pl = myPlans.get(index);
         if (!(pl.getRoots().get(0) instanceof PODemux)) {                      
       
-            if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {   
                                    
-                
-                // unwrap the keys
-                boolean[] keys = keyPositions.get(index);
-                for (int pos = 0; pos < keys.length; pos++) {
-                    if (keys[pos]) {
-                        Tuple tup = (pos == 0) ? 
-                                (Tuple)fld.getValueAsPigType() : 
(Tuple)res.get(pos);
-                        res.set(pos, tup.get(0));
-                    } 
-                    else if (pos == 0) {                 
-                        res.set(0, fld.getValueAsPigType());
-                    }
-                }
-                
-            } else {
-                res.set(0, fld.getValueAsPigType());
-            }
+            res.set(0, fld.getValueAsPigType());
         }
     
         myPlans.get(index).attachInput(res);
         return myPlans.get(index).getLeaves().get(0);
     }
-    
-    /**
-     * Sets a flag indicating if all inner plans have 
-     * the same map key type. 
-     * 
-     * @param sameMapKeyType true if all inner plans have 
-     * the same map key type; otherwise false
-     */
-    public void setSameMapKeyType(boolean sameMapKeyType) {
-        this.sameMapKeyType = sameMapKeyType;
-    }
-
-    /**
-     * Returns a flag indicating if all inner plans 
-     * have the same map key type 
-     * 
-     * @return true if all inner plans have 
-     * the same map key type; otherwise false
-     */
-    public boolean isSameMapKeyType() {
-        return sameMapKeyType;
-    }
 
     /**
      * Sets a flag indicating if this operator is 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
 Thu Dec  3 22:56:45 2009
@@ -18,6 +18,7 @@
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -28,6 +29,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.NullableUnknownWritable;
@@ -63,6 +65,34 @@
     
     private List<POPackage> packages = new ArrayList<POPackage>();
 
+    /**
+     * If the POLocalRearranges corresponding to the reduce plans in 
+     * myPlans (the list of inner plans of the demux) have different key types
+     * then the MultiQueryOptimizer converts all the keys to be of type tuple
+     * by wrapping any non-tuple keys into Tuples (keys which are already 
tuples
+     * are left alone).
+     * The list below is a list of booleans indicating whether extra tuple 
wrapping
+     * was done for the key in the corresponding POLocalRearranges and if we 
need
+     * to "unwrap" the tuple to get to the key
+     */
+    private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+    
+    /*
+     * Indicating if all the inner plans have the same
+     * map key type. If not, the keys passed in are 
+     * wrapped inside tuples and need to be extracted
+     * out during the reduce phase 
+     */
+    private boolean sameMapKeyType = true;
+    
+    /*
+     * Indicating if this operator is in a combiner. 
+     * If not, this operator is in a reducer and the key
+     * values must first be extracted from the tuple-wrap
+     * before writing out to the disk
+     */
+    private boolean inCombiner = false;
+    
     transient private PigNullableWritable myKey;
 
     /**
@@ -109,7 +139,7 @@
 
     @Override
     public String name() {
-        return "MultiQuery Package  - " +  getOperatorKey().toString();
+        return "MultiQuery Package [" + isKeyWrapped + "] - " +  
getOperatorKey().toString();
     }
 
     @Override
@@ -148,6 +178,21 @@
     public void addPackage(POPackage pack) {
         packages.add(pack);        
     }
+    
+    /**
+     * Appends the specified package object to the end of 
+     * the package list.
+     * 
+     * @param pack package to be appended to the list
+     * @param mapKeyType the map key type associated with the package
+     */
+    public void addPackage(POPackage pack, byte mapKeyType) {
+        packages.add(pack);        
+        // if mapKeyType is already a tuple, we will NOT
+        // be wrapping it in an extra tuple. If it is not
+        // a tuple, we will wrap into in a tuple
+        isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+    }
 
     /**
      * Returns the list of packages.
@@ -179,10 +224,20 @@
                 + " should be in the range between 0 and " + packages.size();
             throw new ExecException(msg, errCode, PigException.BUG);
         }
-               
+                  
         POPackage pack = packages.get(index);
-
-        pack.attachInput(myKey, tupIter);
+        
+        // check to see if we need to unwrap the key. The keys may be
+        // wrapped inside a tuple by LocalRearrange operator when jobs  
+        // with different map key types are merged
+        PigNullableWritable curKey = myKey;
+        if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {       
                                
+            Tuple tup = (Tuple)myKey.getValueAsPigType();
+            curKey = HDataType.getWritableComparableTypes(tup.get(0), 
pack.getKeyType());
+            curKey.setIndex(origIndex);
+        }
+            
+        pack.attachInput(curKey, tupIter);
 
         Result res = pack.getNext(t);
         
@@ -218,4 +273,41 @@
         return res;
     }
 
+    /**
+     * Returns the list of booleans that indicates if the 
+     * key needs to unwrapped for the corresponding plan.
+     * 
+     * @return the list of isKeyWrapped boolean values
+     */
+    public List<Boolean> getIsKeyWrappedList() {
+        return Collections.unmodifiableList(isKeyWrapped);
+    }
+    
+    /**
+     * Adds a list of IsKeyWrapped boolean values
+     * 
+     * @param lst the list of boolean values to add
+     */
+    public void addIsKeyWrappedList(List<Boolean> lst) {
+        for (Boolean b : lst) {
+            isKeyWrapped.add(b);
+        }
+    }
+    
+    public void setInCombiner(boolean inCombiner) {
+        this.inCombiner = inCombiner;
+    }
+
+    public boolean isInCombiner() {
+        return inCombiner;
+    }
+
+    public void setSameMapKeyType(boolean sameMapKeyType) {
+        this.sameMapKeyType = sameMapKeyType;
+    }
+
+    public boolean isSameMapKeyType() {
+        return sameMapKeyType;
+    }
+
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=886973&r1=886972&r2=886973&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Thu Dec  3 
22:56:45 2009
@@ -88,6 +88,67 @@
         myPig = null;
     }
     
+    public void testMultiQueryJiraPig1068() {
+
+        // test case: COGROUP fails with 'Type mismatch in key from map: 
+        // expected org.apache.pig.impl.io.NullableText, recieved 
org.apache.pig.impl.io.NullableTuple'
+
+        String INPUT_FILE = "pig-1068.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("10\tapple\tlogin\tjar");
+            w.println("20\torange\tlogin\tbox");
+            w.println("30\tstrawberry\tquit\tbot");
+
+            w.close();
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("logs = load '" + INPUT_FILE 
+                    + "' as (ts:int, id:chararray, command:chararray, 
comments:chararray);");
+            myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', 
all_quits IF command == 'quit';");
+            myPig.registerQuery("login_info = FOREACH logins { GENERATE id as 
id, comments AS client; };");  
+            myPig.registerQuery("logins_grouped = GROUP login_info BY (id, 
client);");
+            myPig.registerQuery("count_logins_by_client = FOREACH 
logins_grouped "
+                    + "{ generate group.id AS id, group.client AS client, 
COUNT($1) AS count; };");
+            myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
+            myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE 
FLATTEN(all_quits); };");
+            myPig.registerQuery("joined_session_info = COGROUP quits BY id, 
count_logins_by_client BY id;");
+            
+            Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "('apple',{},{('apple','jar',1L)})",
+                            "('orange',{},{('orange','box',1L)})",
+                            
"('strawberry',{(30,'strawberry','quit','bot')},{})"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());                
+            }
+
+            assertEquals(expectedResults.size(), counter);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
     @Test
     public void testMultiQueryJiraPig1108() {
         


Reply via email to