Author: gates
Date: Fri Sep 18 17:41:38 2009
New Revision: 816723

URL: http://svn.apache.org/viewvc?rev=816723&view=rev
Log:
PIG-951:  Set parallelism explicitly to 1 for indexing job in merge join


Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=816723&r1=816722&r2=816723&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Sep 18 17:41:38 2009
@@ -30,6 +30,9 @@
 
 BUG FIXES
 
+PIG-951:  Set parallelism explicitly to 1 for indexing job in merge join
+          (ashutoc via gates).
+
 Release 0.5.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=816723&r1=816722&r2=816723&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Sep 18 17:41:38 2009
@@ -1115,7 +1115,8 @@
             }
             
             joinOp.setupRightPipeline(rightPipelinePlan);
-                        
+           rightMROpr.requestedParallelism = 1; // we need exactly one reducer 
for indexing job.        
+            
             // At this point, we must be operating on map plan of right input 
and it would contain nothing else other then a POLoad.
             POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
             
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=816723&r1=816722&r2=816723&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Fri Sep 18 
17:41:38 2009
@@ -26,11 +26,15 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.Before;
@@ -407,6 +411,20 @@
     }       
 
     @Test
+    public void testParallelism() throws Exception{
+
+        LogicalPlanTester tester = new LogicalPlanTester();
+        tester.buildPlan("A = LOAD '" + INPUT_FILE + "';");
+        tester.buildPlan("B = LOAD '" + INPUT_FILE + "';");
+        tester.buildPlan("C = join A by $0, B by $0 using \"merge\" parallel 
50;");
+        LogicalPlan lp = tester.buildPlan("store C into 'out';");
+       PigContext pc = new 
PigContext(ExecType.MAPREDUCE,cluster.getProperties());
+        pc.connect();
+       MROperPlan mro = Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);
+        Assert.assertEquals(1,mro.getRoots().get(0).getRequestedParallelism());
+    }
+
+    @Test
     public void testIndexer() throws IOException{
         Util.createInputFile(cluster, "temp_file1", new String[]{1+""});
         Util.createInputFile(cluster, "temp_file2", new String[]{2+""});

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=816723&r1=816722&r2=816723&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Fri 
Sep 18 17:41:38 2009
@@ -6,7 +6,7 @@
 |       |
 |       |---Load(file:/tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-117
 |
-|---MapReduce(-1,PigStorage) - scope-126:
+|---MapReduce(1,PigStorage) - scope-126:
     |   
Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
 - scope-133
     |   |
     |   |---POSort[tuple]() - scope-132


Reply via email to