Author: olga
Date: Tue Sep 15 23:36:21 2009
New Revision: 815565

URL: http://svn.apache.org/viewvc?rev=815565&view=rev
Log:
PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=815565&r1=815564&r2=815565&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Sep 15 23:36:21 2009
@@ -28,6 +28,8 @@
 
 BUG FIXES
 
+PIG-962: Skewed join creates 3 map reduce jobs (sriranjan via olgan)
+
 Release 0.4.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=815565&r1=815564&r2=815565&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Tue Sep 15 23:36:21 2009
@@ -81,7 +81,7 @@
         POLoad load = (POLoad)po;
         String loadFunc = load.getLFile().getFuncName();
         String loadFile = load.getLFile().getFileName();
-        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && 
!("org.apache.pig.impl.builtin.SkewedJoinSampleLoader".equals(loadFunc))) {
+        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && 
!("org.apache.pig.impl.builtin.PoissonSampleLoader".equals(loadFunc))) {
             log.debug("Not a sampling job.");
             return;
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=815565&r1=815564&r2=815565&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Tue Sep 
15 23:36:21 2009
@@ -180,4 +180,36 @@
         }
         tmpFile.delete();
     }
+    
+    @Test
+    public void testPoissonSampleOptimizer() throws Exception {
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+        planTester.buildPlan("B = load 'input' using PigStorage('\t');");
+        planTester.buildPlan(" C = join A by $0, B by $0 using \"skewed\";");
+        LogicalPlan lp = planTester.buildPlan("store C into 'output';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        int count = 1;
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // Before optimizer visits, number of MR jobs = 3.
+        assertEquals(3,count);
+
+        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        so.visit();
+
+        count = 1;
+        mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // After optimizer visits, number of MR jobs = 2
+        assertEquals(2,count);
+    }
 }


Reply via email to