Author: pradeepkth
Date: Thu Dec  3 18:11:17 2009
New Revision: 886870

URL: http://svn.apache.org/viewvc?rev=886870&view=rev
Log:
PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec  3 18:11:17 2009
@@ -45,6 +45,8 @@
 
 BUG FIXES
 
+PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
+
 PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding
 via olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Dec  3 18:11:17 2009
@@ -57,7 +57,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -216,6 +215,7 @@
      * Used to get the plan that was compiled
      * @return physical plan
      */
+    @Override
     public PhysicalPlan getPlan() {
         return plan;
     }
@@ -902,6 +902,7 @@
         }
     }
 
+    @Override
     public void visitCollectedGroup(POCollectedGroup op) throws 
VisitorException {
         try{
             nonBlocking(op);
@@ -1157,9 +1158,18 @@
             if(rightLoadFunc instanceof IndexableLoadFunc) {
                 
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
                 
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
-                rightMROpr = null; // we don't need the right MROper since
+                
+                // we don't need the right MROper since
                 // the right loader is an IndexableLoadFunc which can handle 
the index
-                // itself 
+                // itself
+                MRPlan.remove(rightMROpr);
+                if(rightMROpr == compiledInputs[0]) {
+                    compiledInputs[0] = null;
+                } else if(rightMROpr == compiledInputs[1]) {
+                    compiledInputs[1] = null;
+                } 
+                rightMROpr = null;
+                
                 // validate that the join keys in merge join are only          
                                                                                
                                                                                
    
                 // simple column projections or '*' and not expression - 
expressions                                                                     
                                                                                
          
                 // cannot be handled when the index is built by the storage 
layer on the sorted                                                             
                                                                                
       

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Thu Dec  3 
18:11:17 2009
@@ -100,6 +100,7 @@
     // and are sure of
     private boolean generate = false;
 
+    @Override
     @Before
     public void setUp() throws ExecException {
         GenPhyOp.setR(r);
@@ -107,6 +108,7 @@
         GenPhyOp.setPc(pc);
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
     }
@@ -947,7 +949,24 @@
         }
 
     }
+    
+    @Test
+    public void testMergeJoinWithIndexableLoadFunc() throws Exception{
 
+        //generate = true;
+        planTester.buildPlan("a = load '/tmp/input1';");
+        planTester.buildPlan("b = load '/tmp/input2' using " +
+            TestMergeJoin.DummyIndexableLoader.class.getName() + ";");
+        planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";");
+        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mp = Util.buildMRPlan(pp, pc);
+        assertEquals("Checking number of MR Jobs for merge join with " +
+                       "IndexableLoadFunc:", 1, mp.size());
+        
+    }
+    
     private void run(PhysicalPlan pp, String expectedFile) throws Exception {
         String compiledPlan, goldenPlan = null;
         int MAX_SIZE = 100000;


Reply via email to