Author: olga
Date: Thu Dec  3 17:30:06 2009
New Revision: 886858

URL: http://svn.apache.org/viewvc?rev=886858&view=rev
Log:
PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding
via olgan)

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=886858&r1=886857&r2=886858&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Thu Dec  3 17:30:06 2009
@@ -127,6 +127,9 @@
 
 BUG FIXES
 
+PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding
+via olgan)
+
 PIG-1022: optimizer pushes filter before the foreach that generates column
 used by filter (daijy via gates)
 

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=886858&r1=886857&r2=886858&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Thu Dec  3 17:30:06 2009
@@ -552,7 +552,7 @@
         return sameKeyType;
     }
     
-    private int setIndexOnLRInSplit(int initial, POSplit splitOp)
+    private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean 
sameKeyType)
             throws VisitorException {
         int index = initial;
         
@@ -568,9 +568,15 @@
                     String msg = "Internal Error. Unable to set multi-query 
index for optimization.";
                     throw new OptimizerException(msg, errCode, 
PigException.BUG, e);                   
                 }
+                
+                // change the map key type to tuple when 
+                // multiple splittees have different map key types
+                if (!sameKeyType) {
+                    lr.setKeyType(DataType.TUPLE);
+                }
             } else if (leaf instanceof POSplit) {
                 POSplit spl = (POSplit)leaf;
-                index = setIndexOnLRInSplit(index, spl);
+                index = setIndexOnLRInSplit(index, spl, sameKeyType);
             }
         }
 
@@ -615,7 +621,7 @@
             // across all POLocalRearranges in all merged map plans
             // including nested ones in POSplit
             POSplit spl = (POSplit)leaf;
-            curIndex = setIndexOnLRInSplit(index, spl);
+            curIndex = setIndexOnLRInSplit(index, spl, sameKeyType);
         }
                     
         splitOp.addPlan(pl);

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java?rev=886858&r1=886857&r2=886858&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestMultiQuery.java 
Thu Dec  3 17:30:06 2009
@@ -116,6 +116,65 @@
     }    
     
     @Test
+    public void testMultiQueryJiraPig1114() {
+
+        // test case: MultiQuery optimization throws error when merging 2 
level splits
+
+        String INPUT_FILE = "data.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("10\tjar");
+            w.println("20\tbox");
+            w.println("30\tbot");
+            w.close();
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("data = load '" + INPUT_FILE
+                    + "' USING PigStorage as (id:int, name:chararray);");
+            myPig.registerQuery("ids = FOREACH data GENERATE id;");
+            myPig.registerQuery("allId = GROUP ids all;");
+            myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as 
allId, COUNT(ids) as total;");
+            myPig.registerQuery("idGroup = GROUP ids by id;");
+            myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group 
as id, COUNT(ids) as count;");
+            myPig.registerQuery("countTotal = cross idGroupCount, 
allIdCount;");
+            myPig.registerQuery("idCountTotal = foreach countTotal generate 
id, count, total, (double)count / (double)total as proportion;");
+            myPig.registerQuery("orderedCounts = order idCountTotal by count 
desc;");
+            myPig.registerQuery("STORE orderedCounts INTO '/tmp/output1';");
+
+            myPig.registerQuery("names = FOREACH data GENERATE name;");
+            myPig.registerQuery("allNames = GROUP names all;");
+            myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE 
group as namesAll, COUNT(names) as total;");
+            myPig.registerQuery("nameGroup = GROUP names by name;");
+            myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE 
group as name, COUNT(names) as count;");
+            myPig.registerQuery("namesCrossed = cross nameGroupCount, 
allNamesCount;");
+            myPig.registerQuery("nameCountTotal = foreach namesCrossed 
generate name, count, total, (double)count / (double)total as proportion;");
+            myPig.registerQuery("nameCountsOrdered = order nameCountTotal by 
count desc;");
+            myPig.registerQuery("STORE nameCountsOrdered INTO 
'/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
+    @Test
     public void testMultiQueryJiraPig1060() {
 
         // test case: 


Reply via email to