Author: pradeepkth Date: Thu Dec 3 18:11:17 2009 New Revision: 886870 URL: http://svn.apache.org/viewvc?rev=886870&view=rev Log: PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886870&r1=886869&r2=886870&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Dec 3 18:11:17 2009 @@ -45,6 +45,8 @@ BUG FIXES +PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth) + PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Dec 3 18:11:17 2009 @@ -57,7 +57,6 @@ import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; @@ -216,6 +215,7 @@ * Used to get the plan that was compiled * @return physical plan */ + @Override public PhysicalPlan getPlan() { return plan; } @@ -902,6 +902,7 @@ } } + @Override public void visitCollectedGroup(POCollectedGroup op) throws VisitorException { try{ nonBlocking(op); @@ -1157,9 +1158,18 @@ if(rightLoadFunc instanceof IndexableLoadFunc) { joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec()); joinOp.setRightInputFileName(rightLoader.getLFile().getFileName()); - rightMROpr = null; // we don't need the right MROper since + + // we don't need the right MROper since // the right loader is an IndexableLoadFunc which can handle the index - // itself + // itself + MRPlan.remove(rightMROpr); + if(rightMROpr == compiledInputs[0]) { + compiledInputs[0] = null; + } else if(rightMROpr == compiledInputs[1]) { + compiledInputs[1] = null; + } + rightMROpr = null; + // validate that the join keys in merge join are only // simple column projections or '*' and not expression - expressions // cannot be handled when the index is built by the storage layer on the sorted Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Thu Dec 3 18:11:17 2009 @@ -100,6 +100,7 @@ // and are sure of private boolean generate = false; + @Override @Before public void setUp() throws ExecException { GenPhyOp.setR(r); @@ -107,6 +108,7 @@ GenPhyOp.setPc(pc); } + @Override @After public void tearDown() throws Exception { } @@ -947,7 +949,24 @@ } } + + @Test + public void testMergeJoinWithIndexableLoadFunc() throws Exception{ + //generate = true; + planTester.buildPlan("a = load '/tmp/input1';"); + planTester.buildPlan("b = load '/tmp/input2' using " + + TestMergeJoin.DummyIndexableLoader.class.getName() + ";"); + planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";"); + LogicalPlan lp = planTester.buildPlan("store c into '/tmp';"); + + PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); + MROperPlan mp = Util.buildMRPlan(pp, pc); + assertEquals("Checking number of MR Jobs for merge join with " + + "IndexableLoadFunc:", 1, mp.size()); + + } + private void run(PhysicalPlan pp, String expectedFile) throws Exception { String compiledPlan, goldenPlan = null; int MAX_SIZE = 100000;