Author: rding
Date: Thu Jun 10 17:24:33 2010
New Revision: 953404

URL: http://svn.apache.org/viewvc?rev=953404&view=rev
Log:
PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs

Modified:
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=953404&r1=953403&r2=953404&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Thu Jun 10 17:24:33 2010
@@ -455,10 +455,12 @@ class MultiQueryOptimizer extends MROpPl
         return true;
     }
        
-    private List<MapReduceOper> getMergeList(List<MapReduceOper> mapReducers) {
+    private List<MapReduceOper> getMergeList(MapReduceOper splitter,
+            List<MapReduceOper> mapReducers) {
         List<MapReduceOper> mergeNoCmbList = new ArrayList<MapReduceOper>();
         List<MapReduceOper> mergeCmbList = new ArrayList<MapReduceOper>();
-
+        List<MapReduceOper> mergeDistList = new ArrayList<MapReduceOper>();
+       
         for (MapReduceOper mrOp : mapReducers) {
             if (isSplitteeMergeable(mrOp)) {
                 if (mrOp.combinePlan.isEmpty()) {
@@ -466,16 +468,26 @@ class MultiQueryOptimizer extends MROpPl
                 } else {
                     mergeCmbList.add(mrOp);
                 } 
-            }           
-        }     
-        return (mergeNoCmbList.size() > mergeCmbList.size()) ?
-                mergeNoCmbList : mergeCmbList;
+            } else if (splitter.reducePlan.isEmpty()
+                    || splitter.needsDistinctCombiner()) {                
+                if (mrOp.needsDistinctCombiner()) {                    
+                    mergeDistList.add(mrOp);
+                }
+            }
+        }    
+        
+        int max = Math.max(mergeNoCmbList.size(), mergeCmbList.size());
+        max = Math.max(max, mergeDistList.size());
+        
+        if (max == mergeDistList.size()) return mergeDistList;
+        else if (max == mergeNoCmbList.size()) return mergeNoCmbList;
+        else return mergeCmbList;                        
     }
     
     private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers, 
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
                 
-        List<MapReduceOper> mergeList = getMergeList(mapReducers);
+        List<MapReduceOper> mergeList = getMergeList(splitter, mapReducers);
     
         if (mergeList.size() <= 1) {
 
@@ -507,7 +519,7 @@ class MultiQueryOptimizer extends MROpPl
         // MR splittees into the splitter. What we'll do is to merge multiple 
         // splittees (if exists) into a new MR operator and connect it to the 
splitter.
         
-        List<MapReduceOper> mergeList = getMergeList(mapReducers);
+        List<MapReduceOper> mergeList = getMergeList(splitter, mapReducers);
     
         if (mergeList.size() <= 1) {
             // nothing to merge, just return

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=953404&r1=953403&r2=953404&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Thu Jun 10 
17:24:33 2010
@@ -112,6 +112,122 @@ public class TestMultiQuery {
     }
 
     @Test
+    public void testMultiQueryJiraPig1438() {
+
+        // test case: merge multiple distinct jobs
+        
+        String INPUT_FILE = "abc";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\t2\t3");
+            w.println("2\t3\t4");
+            w.println("1\t2\t3");
+            w.println("2\t3\t4");
+            w.println("1\t2\t3");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, 
col2:int, col3:int);");
+            myPig.registerQuery("B1 = foreach A generate col1, col2;");
+            myPig.registerQuery("B2 = foreach A generate col2, col3;");
+            myPig.registerQuery("C1 = distinct B1;");
+            myPig.registerQuery("C2 = distinct B2;");
+            myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+            myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
+            myPig.registerQuery("store D1 into '/tmp/output1';");
+            myPig.registerQuery("store D2 into '/tmp/output2';");
+            
+            LogicalPlan lp = checkLogicalPlan(1, 2, 13);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 13);
+
+            checkMRPlan(pp, 1, 1, 1); 
+            
+            myPig.executeBatch();
+            
+            myPig.registerQuery("E = load '/tmp/output1' as (a:int, b:int);"); 
           
+            Iterator<Tuple> iter = myPig.openIterator("E");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(1,2)",
+                            "(2,3)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());      
+            }
+            assertEquals(expectedResults.size(), counter);
+                        
+            myPig.registerQuery("E = load '/tmp/output2' as (a:int, b:int);"); 
           
+            iter = myPig.openIterator("E");
+
+            expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(2,3)",
+                            "(3,4)"
+                    });
+            
+            counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());      
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
+    @Test
+    public void testMultiQueryJiraPig1438_2() {
+
+        // test case: merge multiple distinct jobs -- one group by job, one 
distinct job
+        
+        String INPUT_FILE = "abc";
+        
+        try {           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, 
col2:int, col3:int);");
+            myPig.registerQuery("B1 = foreach A generate col1, col2;");
+            myPig.registerQuery("B2 = foreach A generate col2, col3;");
+            myPig.registerQuery("C1 = distinct B1;");
+            myPig.registerQuery("C2 = group B2 by (col2, col3);");
+            myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+            myPig.registerQuery("D2 = foreach C2 generate B2.col2, B2.col3;");
+            myPig.registerQuery("store D1 into '/tmp/output1';");
+            myPig.registerQuery("store D2 into '/tmp/output2';");
+            
+            LogicalPlan lp = checkLogicalPlan(1, 2, 13);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+            checkMRPlan(pp, 1, 1, 2); 
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+    
+    @Test
     public void testMultiQueryJiraPig1252() {
 
         // test case: Problems with secondary key optimization and multiquery


Reply via email to