Author: pradeepkth Date: Tue Oct 6 17:58:32 2009 New Revision: 822382 URL: http://svn.apache.org/viewvc?rev=822382&view=rev Log: PERFORMANCE: multi-query optimization on multiple group bys following a join or cogroup (rding via pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=822382&r1=822381&r2=822382&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Oct 6 17:58:32 2009 @@ -26,6 +26,9 @@ IMPROVEMENTS +PIG-983: PERFORMANCE: multi-query optimization on multiple group bys +following a join or cogroup (rding via pradeepkth) + PIG-975: Need a databag that does not register with SpillableMemoryManager and spill data pro-actively (yinghe via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=822382&r1=822381&r2=822382&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Tue Oct 6 17:58:32 2009 @@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; @@ -157,23 +158,38 @@ numMerges += n; } - - // case 4: multiple splittees and at least one of them has reducer - if (isMapOnly(mr) && mapReducers.size() > 0) { + + if (mapReducers.size() > 0) { + + boolean isMapOnly = isMapOnly(mr); + int merged = 0; + + // case 4: multiple splittees and at least one of them has reducer + // and the splitter is map-only + if (isMapOnly) { - PhysicalOperator leaf = splitterPl.getLeaves().get(0); + PhysicalOperator leaf = splitterPl.getLeaves().get(0); - splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf; + splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf; - int n = mergeMapReduceSplittees(mapReducers, mr, splitOp); + merged = mergeMapReduceSplittees(mapReducers, mr, splitOp); + } - log.info("Merged " + n + " map-reduce splittees."); + // case 5: multiple splittees and at least one of them has reducer + // and splitter has reducer + else { + + merged = mergeMapReduceSplittees(mapReducers, mr); + + } + + log.info("Merged " + merged + " map-reduce splittees."); - numMerges += n; + numMerges += merged; } - - // finally, add original store to the split operator - // if there is splittee that hasn't been merged + + // Finally, add original store to the split operator + // if there is splittee that hasn't been merged into the splitter if (splitOp != null && (numMerges < numSplittees)) { @@ -187,7 +203,7 @@ throw new OptimizerException(msg, errCode, PigException.BUG, e); } } - + log.info("Merged " + numMerges + " out of total " + numSplittees + " splittees."); } @@ -357,6 +373,53 @@ return mergeList.size(); } + + private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers, + MapReduceOper splitter) throws VisitorException { + + // In this case the splitter has non-empty reducer so we can't merge + // MR splittees into the splitter. What we'll do is to merge multiple + // splittees (if exists) into a new MR operator and connect it to the splitter. + + List<MapReduceOper> mergeList = getMergeList(mapReducers); + + if (mergeList.size() <= 1) { + // nothing to merge, just return + return 0; + } + + MapReduceOper mrOper = getMROper(); + + MapReduceOper splittee = mergeList.get(0); + PhysicalPlan pl = splittee.mapPlan; + POLoad load = (POLoad)pl.getRoots().get(0); + + mrOper.mapPlan.add(load); + + // add a dummy store operator, it'll be replaced by the split operator later. + try { + mrOper.mapPlan.addAsLeaf(getStore()); + } catch (PlanException e) { + int errCode = 2137; + String msg = "Internal Error. Unable to add store to the plan as leaf for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + + // connect the new MR operator to the splitter + try { + getPlan().add(mrOper); + getPlan().connect(splitter, mrOper); + } catch (PlanException e) { + int errCode = 2133; + String msg = "Internal Error. Unable to connect splitter with successors for optimization."; + throw new OptimizerException(msg, errCode, PigException.BUG, e); + } + + // merger the splittees into the new MR operator + mergeAllMapReduceSplittees(mergeList, mrOper, getSplit()); + + return (mergeList.size() - 1); + } private boolean hasSameMapKeyType(List<MapReduceOper> splittees) { boolean sameKeyType = true; @@ -1006,7 +1069,15 @@ private POSplit getSplit(){ return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope))); } - + + private MapReduceOper getMROper(){ + return new MapReduceOper(new OperatorKey(scope, nig.getNextNodeId(scope))); + } + + private POStore getStore(){ + return new POStore(new OperatorKey(scope, nig.getNextNodeId(scope))); + } + private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){ PODemux demux = new PODemux(new OperatorKey(scope, nig.getNextNodeId(scope))); demux.setSameMapKeyType(sameMapKeyType); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=822382&r1=822381&r2=822382&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct 6 17:58:32 2009 @@ -255,6 +255,66 @@ Assert.fail(); } } + + @Test + public void testMultiQueryJiraPig983() { + + System.out.println("===== multi-query Jira Pig-983 ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5;"); + myPig.registerQuery("d = join b by uname, c by uname;"); + myPig.registerQuery("e = group d by b::gid;"); + myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);"); + myPig.registerQuery("store e1 into '/tmp/output1';"); + myPig.registerQuery("f = group d by c::gid;"); + myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);"); + myPig.registerQuery("store f1 into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 17); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25); + + checkMRPlan(pp, 1, 1, 3); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryJiraPig983_2() { + + System.out.println("===== multi-query Jira Pig-983_2 ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5;"); + myPig.registerQuery("d = join b by uname, c by uname;"); + myPig.registerQuery("e = group d by b::gid;"); + myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);"); + myPig.registerQuery("store e1 into '/tmp/output1';"); + myPig.registerQuery("f = group d by c::gid;"); + myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);"); + myPig.registerQuery("store f1 into '/tmp/output2';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } @Test public void testMultiQueryPhase3WithoutCombiner() { @@ -1012,7 +1072,7 @@ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 22); - checkMRPlan(pp, 1, 2, 3); + checkMRPlan(pp, 1, 1, 2); } catch (Exception e) { e.printStackTrace(); @@ -1052,7 +1112,7 @@ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 32); - checkMRPlan(pp, 1, 2, 5); + checkMRPlan(pp, 1, 2, 4); } catch (Exception e) { e.printStackTrace();