Author: pradeepkth
Date: Thu Dec 3 18:11:17 2009
New Revision: 886870
URL: http://svn.apache.org/viewvc?rev=886870&view=rev
Log:
PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec 3 18:11:17 2009
@@ -45,6 +45,8 @@
BUG FIXES
+PIG-1116: Remove redundant map-reduce job for merge join (pradeepkth)
+
PIG-1114: MultiQuery optimization throws error when merging 2 level spl (rding
via olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Dec 3 18:11:17 2009
@@ -57,7 +57,6 @@
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -216,6 +215,7 @@
* Used to get the plan that was compiled
* @return physical plan
*/
+ @Override
public PhysicalPlan getPlan() {
return plan;
}
@@ -902,6 +902,7 @@
}
}
+ @Override
public void visitCollectedGroup(POCollectedGroup op) throws
VisitorException {
try{
nonBlocking(op);
@@ -1157,9 +1158,18 @@
if(rightLoadFunc instanceof IndexableLoadFunc) {
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
- rightMROpr = null; // we don't need the right MROper since
+
+ // we don't need the right MROper since
// the right loader is an IndexableLoadFunc which can handle
the index
- // itself
+ // itself
+ MRPlan.remove(rightMROpr);
+ if(rightMROpr == compiledInputs[0]) {
+ compiledInputs[0] = null;
+ } else if(rightMROpr == compiledInputs[1]) {
+ compiledInputs[1] = null;
+ }
+ rightMROpr = null;
+
// validate that the join keys in merge join are only
// simple column projections or '*' and not expression -
expressions
// cannot be handled when the index is built by the storage
layer on the sorted
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=886870&r1=886869&r2=886870&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Thu Dec 3
18:11:17 2009
@@ -100,6 +100,7 @@
// and are sure of
private boolean generate = false;
+ @Override
@Before
public void setUp() throws ExecException {
GenPhyOp.setR(r);
@@ -107,6 +108,7 @@
GenPhyOp.setPc(pc);
}
+ @Override
@After
public void tearDown() throws Exception {
}
@@ -947,7 +949,24 @@
}
}
+
+ @Test
+ public void testMergeJoinWithIndexableLoadFunc() throws Exception{
+ //generate = true;
+ planTester.buildPlan("a = load '/tmp/input1';");
+ planTester.buildPlan("b = load '/tmp/input2' using " +
+ TestMergeJoin.DummyIndexableLoader.class.getName() + ";");
+ planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";");
+ LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mp = Util.buildMRPlan(pp, pc);
+ assertEquals("Checking number of MR Jobs for merge join with " +
+ "IndexableLoadFunc:", 1, mp.size());
+
+ }
+
private void run(PhysicalPlan pp, String expectedFile) throws Exception {
String compiledPlan, goldenPlan = null;
int MAX_SIZE = 100000;