Author: pradeepkth
Date: Tue Oct 6 17:58:32 2009
New Revision: 822382
URL: http://svn.apache.org/viewvc?rev=822382&view=rev
Log:
PERFORMANCE: multi-query optimization on multiple group bys following a join or
cogroup (rding via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct 6 17:58:32 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-983: PERFORMANCE: multi-query optimization on multiple group bys
+following a join or cogroup (rding via pradeepkth)
+
PIG-975: Need a databag that does not register with SpillableMemoryManager and
spill data pro-actively (yinghe via olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Tue Oct 6 17:58:32 2009
@@ -30,6 +30,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -157,23 +158,38 @@
numMerges += n;
}
-
- // case 4: multiple splittees and at least one of them has reducer
- if (isMapOnly(mr) && mapReducers.size() > 0) {
+
+ if (mapReducers.size() > 0) {
+
+ boolean isMapOnly = isMapOnly(mr);
+ int merged = 0;
+
+ // case 4: multiple splittees and at least one of them has reducer
+ // and the splitter is map-only
+ if (isMapOnly) {
- PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+ PhysicalOperator leaf = splitterPl.getLeaves().get(0);
- splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf;
+ splitOp = (leaf instanceof POStore) ? getSplit() :
(POSplit)leaf;
- int n = mergeMapReduceSplittees(mapReducers, mr, splitOp);
+ merged = mergeMapReduceSplittees(mapReducers, mr, splitOp);
+ }
- log.info("Merged " + n + " map-reduce splittees.");
+ // case 5: multiple splittees and at least one of them has reducer
+ // and splitter has reducer
+ else {
+
+ merged = mergeMapReduceSplittees(mapReducers, mr);
+
+ }
+
+ log.info("Merged " + merged + " map-reduce splittees.");
- numMerges += n;
+ numMerges += merged;
}
-
- // finally, add original store to the split operator
- // if there is splittee that hasn't been merged
+
+ // Finally, add original store to the split operator
+ // if there is splittee that hasn't been merged into the splitter
if (splitOp != null
&& (numMerges < numSplittees)) {
@@ -187,7 +203,7 @@
throw new OptimizerException(msg, errCode, PigException.BUG,
e);
}
}
-
+
log.info("Merged " + numMerges + " out of total "
+ numSplittees + " splittees.");
}
@@ -357,6 +373,53 @@
return mergeList.size();
}
+
+ private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers,
+ MapReduceOper splitter) throws VisitorException {
+
+ // In this case the splitter has non-empty reducer so we can't merge
+ // MR splittees into the splitter. What we'll do is to merge multiple
+ // splittees (if exists) into a new MR operator and connect it to the
splitter.
+
+ List<MapReduceOper> mergeList = getMergeList(mapReducers);
+
+ if (mergeList.size() <= 1) {
+ // nothing to merge, just return
+ return 0;
+ }
+
+ MapReduceOper mrOper = getMROper();
+
+ MapReduceOper splittee = mergeList.get(0);
+ PhysicalPlan pl = splittee.mapPlan;
+ POLoad load = (POLoad)pl.getRoots().get(0);
+
+ mrOper.mapPlan.add(load);
+
+ // add a dummy store operator, it'll be replaced by the split operator
later.
+ try {
+ mrOper.mapPlan.addAsLeaf(getStore());
+ } catch (PlanException e) {
+ int errCode = 2137;
+ String msg = "Internal Error. Unable to add store to the plan as
leaf for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+
+ // connect the new MR operator to the splitter
+ try {
+ getPlan().add(mrOper);
+ getPlan().connect(splitter, mrOper);
+ } catch (PlanException e) {
+ int errCode = 2133;
+ String msg = "Internal Error. Unable to connect splitter with
successors for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+
+ // merger the splittees into the new MR operator
+ mergeAllMapReduceSplittees(mergeList, mrOper, getSplit());
+
+ return (mergeList.size() - 1);
+ }
private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
boolean sameKeyType = true;
@@ -1006,7 +1069,15 @@
private POSplit getSplit(){
return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
-
+
+ private MapReduceOper getMROper(){
+ return new MapReduceOper(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ }
+
+ private POStore getStore(){
+ return new POStore(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ }
+
private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
PODemux demux = new PODemux(new OperatorKey(scope,
nig.getNextNodeId(scope)));
demux.setSameMapKeyType(sameMapKeyType);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=822382&r1=822381&r2=822382&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct 6
17:58:32 2009
@@ -255,6 +255,66 @@
Assert.fail();
}
}
+
+ @Test
+ public void testMultiQueryJiraPig983() {
+
+ System.out.println("===== multi-query Jira Pig-983 =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5;");
+ myPig.registerQuery("d = join b by uname, c by uname;");
+ myPig.registerQuery("e = group d by b::gid;");
+ myPig.registerQuery("e1 = foreach e generate group,
COUNT(d.b::uid);");
+ myPig.registerQuery("store e1 into '/tmp/output1';");
+ myPig.registerQuery("f = group d by c::gid;");
+ myPig.registerQuery("f1 = foreach f generate group,
SUM(d.c::uid);");
+ myPig.registerQuery("store f1 into '/tmp/output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
+
+ checkMRPlan(pp, 1, 1, 3);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig983_2() {
+
+ System.out.println("===== multi-query Jira Pig-983_2 =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load
'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5;");
+ myPig.registerQuery("d = join b by uname, c by uname;");
+ myPig.registerQuery("e = group d by b::gid;");
+ myPig.registerQuery("e1 = foreach e generate group,
COUNT(d.b::uid);");
+ myPig.registerQuery("store e1 into '/tmp/output1';");
+ myPig.registerQuery("f = group d by c::gid;");
+ myPig.registerQuery("f1 = foreach f generate group,
SUM(d.c::uid);");
+ myPig.registerQuery("store f1 into '/tmp/output2';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
@Test
public void testMultiQueryPhase3WithoutCombiner() {
@@ -1012,7 +1072,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 22);
- checkMRPlan(pp, 1, 2, 3);
+ checkMRPlan(pp, 1, 1, 2);
} catch (Exception e) {
e.printStackTrace();
@@ -1052,7 +1112,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 32);
- checkMRPlan(pp, 1, 2, 5);
+ checkMRPlan(pp, 1, 2, 4);
} catch (Exception e) {
e.printStackTrace();