Author: pradeepkth
Date: Tue Oct  6 17:58:32 2009
New Revision: 822382

URL: http://svn.apache.org/viewvc?rev=822382&view=rev
Log:
PERFORMANCE: multi-query optimization on multiple group bys following a join or 
cogroup (rding via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct  6 17:58:32 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-983:  PERFORMANCE: multi-query optimization on multiple group bys
+following a join or cogroup (rding via pradeepkth)
+
 PIG-975: Need a databag that does not register with SpillableMemoryManager and
 spill data pro-actively (yinghe via olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Tue Oct  6 17:58:32 2009
@@ -30,6 +30,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -157,23 +158,38 @@
             
             numMerges += n;   
         }            
-              
-        // case 4: multiple splittees and at least one of them has reducer  
-        if (isMapOnly(mr) && mapReducers.size() > 0) {
+        
+        if (mapReducers.size() > 0) {
+            
+            boolean isMapOnly = isMapOnly(mr);
+            int merged = 0;
+            
+            // case 4: multiple splittees and at least one of them has reducer 
 
+            //         and the splitter is map-only   
+            if (isMapOnly) {
                          
-            PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+                PhysicalOperator leaf = splitterPl.getLeaves().get(0);
                                                             
-            splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf;
+                splitOp = (leaf instanceof POStore) ? getSplit() : 
(POSplit)leaf;
                     
-            int n = mergeMapReduceSplittees(mapReducers, mr, splitOp);  
+                merged = mergeMapReduceSplittees(mapReducers, mr, splitOp);  
+            }
             
-            log.info("Merged " + n + " map-reduce splittees.");
+            // case 5: multiple splittees and at least one of them has reducer
+            //         and splitter has reducer
+            else {
+                
+                merged = mergeMapReduceSplittees(mapReducers, mr);  
+                
+            }
+            
+            log.info("Merged " + merged + " map-reduce splittees.");
             
-            numMerges += n;      
+            numMerges += merged;      
         }
-       
-        // finally, add original store to the split operator 
-        // if there is splittee that hasn't been merged
+        
+        // Finally, add original store to the split operator 
+        // if there is splittee that hasn't been merged into the splitter
         if (splitOp != null 
                 && (numMerges < numSplittees)) {
 
@@ -187,7 +203,7 @@
                 throw new OptimizerException(msg, errCode, PigException.BUG, 
e);
             }    
         }
-        
+
         log.info("Merged " + numMerges + " out of total " 
                 + numSplittees + " splittees.");
     }                
@@ -357,6 +373,53 @@
         
         return mergeList.size();
     }
+
+    private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers, 
+            MapReduceOper splitter) throws VisitorException {
+
+        // In this case the splitter has non-empty reducer so we can't merge 
+        // MR splittees into the splitter. What we'll do is to merge multiple 
+        // splittees (if exists) into a new MR operator and connect it to the 
splitter.
+        
+        List<MapReduceOper> mergeList = getMergeList(mapReducers);
+    
+        if (mergeList.size() <= 1) {
+            // nothing to merge, just return
+            return  0;
+        } 
+                         
+        MapReduceOper mrOper = getMROper();
+
+        MapReduceOper splittee = mergeList.get(0);
+        PhysicalPlan pl = splittee.mapPlan;
+        POLoad load = (POLoad)pl.getRoots().get(0);
+        
+        mrOper.mapPlan.add(load);
+       
+        // add a dummy store operator, it'll be replaced by the split operator 
later.
+        try {
+            mrOper.mapPlan.addAsLeaf(getStore());
+        } catch (PlanException e) {                   
+            int errCode = 2137;
+            String msg = "Internal Error. Unable to add store to the plan as 
leaf for optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+
+        // connect the new MR operator to the splitter
+        try {
+            getPlan().add(mrOper);
+            getPlan().connect(splitter, mrOper);
+        } catch (PlanException e) {
+            int errCode = 2133;
+            String msg = "Internal Error. Unable to connect splitter with 
successors for optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+
+        // merger the splittees into the new MR operator
+        mergeAllMapReduceSplittees(mergeList, mrOper, getSplit());
+        
+        return (mergeList.size() - 1);
+    }
     
     private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
         boolean sameKeyType = true;
@@ -1006,7 +1069,15 @@
     private POSplit getSplit(){
         return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
     } 
-    
+ 
+    private MapReduceOper getMROper(){
+        return new MapReduceOper(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+    } 
+   
+    private POStore getStore(){
+        return new POStore(new OperatorKey(scope, nig.getNextNodeId(scope)));
+    } 
+     
     private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
         PODemux demux = new PODemux(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
         demux.setSameMapKeyType(sameMapKeyType);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct  6 
17:58:32 2009
@@ -255,6 +255,66 @@
             Assert.fail();
         } 
     }         
+ 
+    @Test
+    public void testMultiQueryJiraPig983() {
+
+        System.out.println("===== multi-query Jira Pig-983 =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5;");
+            myPig.registerQuery("d = join b by uname, c by uname;");
+            myPig.registerQuery("e = group d by b::gid;");
+            myPig.registerQuery("e1 = foreach e generate group, 
COUNT(d.b::uid);");
+            myPig.registerQuery("store e1 into '/tmp/output1';");
+            myPig.registerQuery("f = group d by c::gid;");
+            myPig.registerQuery("f1 = foreach f generate group, 
SUM(d.c::uid);");
+            myPig.registerQuery("store f1 into '/tmp/output2';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
+
+            checkMRPlan(pp, 1, 1, 3);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }       
+    
+    @Test
+    public void testMultiQueryJiraPig983_2() {
+
+        System.out.println("===== multi-query Jira Pig-983_2 =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5;");
+            myPig.registerQuery("d = join b by uname, c by uname;");
+            myPig.registerQuery("e = group d by b::gid;");
+            myPig.registerQuery("e1 = foreach e generate group, 
COUNT(d.b::uid);");
+            myPig.registerQuery("store e1 into '/tmp/output1';");
+            myPig.registerQuery("f = group d by c::gid;");
+            myPig.registerQuery("f1 = foreach f generate group, 
SUM(d.c::uid);");
+            myPig.registerQuery("store f1 into '/tmp/output2';");
+             
+            myPig.executeBatch();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
     
     @Test
     public void testMultiQueryPhase3WithoutCombiner() {
@@ -1012,7 +1072,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 22);
 
-            checkMRPlan(pp, 1, 2, 3); 
+            checkMRPlan(pp, 1, 1, 2); 
         
         } catch (Exception e) {
             e.printStackTrace();
@@ -1052,7 +1112,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 32);
 
-            checkMRPlan(pp, 1, 2, 5);            
+            checkMRPlan(pp, 1, 2, 4);            
 
         } catch (Exception e) {
             e.printStackTrace();


Reply via email to