Author: pradeepkth
Date: Fri Oct 30 20:53:46 2009
New Revision: 831452

URL: http://svn.apache.org/viewvc?rev=831452&view=rev
Log:
PIG-920:  optimizing diamond queries (rding via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 20:53:46 2009
@@ -26,6 +26,8 @@
 
 IMPROVEMENTS
 
+PIG-920:  optimizing diamond queries (rding via pradeepkth)
+
 PIG-1040: FINDBUGS: MS_SHOULD_BE_FINAL: Field isn't final but should be  
(olgan)
 
 PIG-1059: FINDBUGS: remaining Bad practice + Multithreaded correctness Warning 
(olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Fri Oct 30 20:53:46 2009
@@ -24,12 +24,14 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
@@ -43,9 +45,8 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.PigException;
+import org.apache.pig.impl.util.Pair;
 
 
 /** 
@@ -204,10 +205,121 @@
             }    
         }
 
+        // case 6: special diamond case with trivial MR operator at the head
+        if (numMerges == 0 && isDiamondMROper(mr)) {
+            int merged = mergeDiamondMROper(mr, getPlan().getSuccessors(mr));
+            log.info("Merged " + merged + " diamond splitter.");
+            numMerges += merged;    
+        }
+        
         log.info("Merged " + numMerges + " out of total " 
-                + numSplittees + " splittees.");
+                + (numSplittees +1) + " MR operators.");
     }                
     
+    private boolean isDiamondMROper(MapReduceOper mr) {
+        
+        // We'll remove this mr as part of diamond query optimization
+        // only if this mr is a trivial one, that is, it's plan
+        // has either two operators (load followed by store) or three 
operators 
+        // (the operator between the load and store must be a foreach,
+        // introduced by casting operation).
+        // 
+        // We won't optimize in other cases where there're more operators
+        // in the plan. Otherwise those operators world run multiple times 
+        // in the successor MR operators which may not give better
+        // performance.
+        boolean rtn = false;
+        if (isMapOnly(mr)) {
+            PhysicalPlan pl = mr.mapPlan;
+            if (pl.size() == 2 || pl.size() == 3) {               
+                PhysicalOperator root = pl.getRoots().get(0);
+                PhysicalOperator leaf = pl.getLeaves().get(0);
+                if (root instanceof POLoad && leaf instanceof POStore) {
+                    if (pl.size() == 3) {
+                        PhysicalOperator mid = pl.getSuccessors(root).get(0);
+                        if (mid instanceof POForEach) {
+                            rtn = true;
+                        }                      
+                    } else {
+                        rtn = true;
+                    }
+                }
+            }
+        }
+        return rtn;
+    }
+    
+    private int mergeDiamondMROper(MapReduceOper mr, List<MapReduceOper> 
succs) 
+        throws VisitorException {
+       
+        // Only consider the cases where all inputs of the splittees are 
+        // from the splitter
+        for (MapReduceOper succ : succs) {
+            List<MapReduceOper> preds = getPlan().getPredecessors(succ);
+            if (preds.size() != 1) {
+                return 0;
+            }
+        }
+        
+        // first, remove the store operator from the splitter
+        PhysicalPlan pl = mr.mapPlan;
+        PhysicalOperator leaf = mr.mapPlan.getLeaves().get(0);
+        pl.remove(leaf);
+        
+        // then connect the remaining map plan to the successor of
+        // each root (load) operator of the splittee
+        for (MapReduceOper succ : succs) {
+            List<PhysicalOperator> roots = succ.mapPlan.getRoots();
+            ArrayList<PhysicalOperator> rootsCopy = 
+                new ArrayList<PhysicalOperator>(roots);
+            for (PhysicalOperator op : rootsCopy) {
+                PhysicalOperator opSucc = 
succ.mapPlan.getSuccessors(op).get(0);
+                PhysicalPlan clone = null;
+                try {
+                    clone = pl.clone();
+                } catch (CloneNotSupportedException e) {
+                    int errCode = 2127;
+                    String msg = "Internal Error: Cloning of plan failed for 
optimization.";
+                    throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                }
+                succ.mapPlan.remove(op);
+                while (!clone.isEmpty()) {
+                    PhysicalOperator oper = clone.getLeaves().get(0);
+                    clone.remove(oper);
+                    succ.mapPlan.add(oper);
+                    try {
+                        succ.mapPlan.connect(oper, opSucc);
+                        opSucc = oper;
+                    } catch (PlanException e) {
+                        int errCode = 2131;
+                        String msg = "Internal Error. Unable to connect split 
plan for optimization.";
+                        throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                    }                
+                }
+            }
+        }
+        
+        // finally, remove the splitter from the MR plan
+        List<MapReduceOper> mrPreds = getPlan().getPredecessors(mr);
+        if (mrPreds != null) {
+            for (MapReduceOper pred : mrPreds) {
+                for (MapReduceOper succ : succs) {
+                    try {
+                        getPlan().connect(pred, succ);
+                    } catch (PlanException e) {
+                        int errCode = 2131;
+                        String msg = "Internal Error. Unable to connect split 
plan for optimization.";
+                        throw new OptimizerException(msg, errCode, 
PigException.BUG, e);
+                    }
+                }
+            }
+        }
+        
+        getPlan().remove(mr);
+        
+        return 1;
+    }
+    
     private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
     throws VisitorException {
         PhysicalPlan splitterPl = isMapOnly(splitter) ? 
@@ -1091,5 +1203,5 @@
     
     private POMultiQueryPackage getMultiQueryPackage(){
         return new POMultiQueryPackage(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
-    }  
+    }   
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri Oct 30 
20:53:46 2009
@@ -89,6 +89,154 @@
     public void tearDown() throws Exception {
         myPig = null;
     }
+    
+    @Test
+    public void testMultiQueryJiraPig920() {
+
+        // test case: a simple diamond query
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = cogroup c by $0, b by $0;");
+            myPig.registerQuery("e = foreach d generate group, COUNT(c), 
COUNT(b);");
+            myPig.registerQuery("store e into '/tmp/output1';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 1, 10);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 13);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }       
+ 
+    @Test
+    public void testMultiQueryJiraPig920_1() {
+
+        // test case: a query with two diamonds
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = filter a by uid >= 5;");
+            myPig.registerQuery("e = filter a by gid < 5;");
+            myPig.registerQuery("f = cogroup c by $0, b by $0;");
+            myPig.registerQuery("f1 = foreach f generate group, COUNT(c), 
COUNT(b);");
+            myPig.registerQuery("store f1 into '/tmp/output1';");
+            myPig.registerQuery("g = cogroup d by $0, e by $0;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d), 
COUNT(e);");
+            myPig.registerQuery("store g1 into '/tmp/output2';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 23);
+
+            checkMRPlan(pp, 2, 2, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryJiraPig920_2() {
+
+        // test case: execution of a query with two diamonds
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = filter a by uid >= 5;");
+            myPig.registerQuery("e = filter a by gid < 5;");
+            myPig.registerQuery("f = cogroup c by $0, b by $0;");
+            myPig.registerQuery("f1 = foreach f generate group, COUNT(c), 
COUNT(b);");
+            myPig.registerQuery("store f1 into '/tmp/output1';");
+            myPig.registerQuery("g = cogroup d by $0, e by $0;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d), 
COUNT(e);");
+            myPig.registerQuery("store g1 into '/tmp/output2';");
+             
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }            
+    
+    @Test
+    public void testMultiQueryJiraPig920_3() {
+
+        // test case: execution of a simple diamond query
+        
+        String INPUT_FILE = "pig-920.txt";
+        
+        try {
+            
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\tapple\t100\t10");
+            w.println("apple\tapple\t200\t20");
+            w.println("orange\torange\t100\t10");
+            w.println("orange\torange\t300\t20");
+   
+            w.close();
+            
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load '" + INPUT_FILE +
+                                "' as (uname:chararray, passwd:chararray, 
uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 300;");
+            myPig.registerQuery("c = filter a by gid > 10;");
+            myPig.registerQuery("d = cogroup c by $0, b by $0;");
+            myPig.registerQuery("e = foreach d generate group, COUNT(c), 
COUNT(b);");
+                                   
+            Iterator<Tuple> iter = myPig.openIterator("e");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "('apple',1L,2L)",
+                            "('orange',1L,1L)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+            
+        }
+    }        
 
     @Test
     public void testMultiQueryWithDemoCase() {
@@ -491,7 +639,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
 
-            checkMRPlan(pp, 1, 1, 3);
+            checkMRPlan(pp, 1, 1, 2);
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -519,8 +667,13 @@
             myPig.registerQuery("f1 = foreach f generate group, 
SUM(d.c::uid);");
             myPig.registerQuery("store f1 into '/tmp/output2';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
 
+            assertTrue(jobs.size() == 2);
+            
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();


Reply via email to