Author: gates
Date: Mon Nov 10 17:23:00 2008
New Revision: 712900
URL: http://svn.apache.org/viewvc?rev=712900&view=rev
Log:
PIG-484 Work by Pradeep to use the streaming join optimization for group by
queries as well.
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
Mon Nov 10 17:23:00 2008
@@ -25,6 +25,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -97,9 +98,12 @@
private int mKeyField = -1;
private byte mKeyType = 0;
+
+ private String chunkSize;
- public CombinerOptimizer(MROperPlan plan) {
+ public CombinerOptimizer(MROperPlan plan, String chunkSize) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ this.chunkSize = chunkSize;
}
@Override
@@ -194,7 +198,12 @@
fixUpRearrange(clr);
mr.combinePlan.add(clr);
mr.combinePlan.connect(cfe, clr);
-
+
+ // stream input to the algebraics in the
+ // combine plan
+ LastInputStreamingOptimizer.replaceWithPOJoinPackage(
+ mr.combinePlan, cp, cfe, chunkSize);
+
// Use the ExprType list returned from algebraic to tell
// POPostCombinerPackage which fields need projected and
// which placed in bags.
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
Mon Nov 10 17:23:00 2008
@@ -18,7 +18,7 @@
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
@@ -116,9 +116,12 @@
comp.randomizeFileLocalizer();
comp.compile();
MROperPlan plan = comp.getMRPlan();
+ String lastInputChunkSize =
+ pc.getProperties().getProperty(
+ "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
String prop = System.getProperty("pig.exec.nocombiner");
if (!("true".equals(prop))) {
- CombinerOptimizer co = new CombinerOptimizer(plan);
+ CombinerOptimizer co = new CombinerOptimizer(plan,
lastInputChunkSize);
co.visit();
}
@@ -131,9 +134,9 @@
checker.visit();
// optimize joins
- CoGroupStreamingOptimizerVisitor cgso = new
MRCompiler.CoGroupStreamingOptimizerVisitor(plan,
- pc.getProperties().getProperty("join.biggest.input.chunksize",
POJoinPackage.DEFAULT_CHUNK_SIZE));
- cgso.visit();
+ LastInputStreamingOptimizer liso =
+ new MRCompiler.LastInputStreamingOptimizer(plan,
lastInputChunkSize);
+ liso.visit();
// figure out the type of the key for the map plan
// this is needed when the key is null to create
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Mon Nov 10 17:23:00 2008
@@ -1176,11 +1176,11 @@
return mro;
}
- static class CoGroupStreamingOptimizerVisitor extends MROpPlanVisitor {
+ static class LastInputStreamingOptimizer extends MROpPlanVisitor {
Log log = LogFactory.getLog(this.getClass());
String chunkSize;
- CoGroupStreamingOptimizerVisitor(MROperPlan plan, String chunkSize) {
+ LastInputStreamingOptimizer(MROperPlan plan, String chunkSize) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
this.chunkSize = chunkSize;
}
@@ -1199,12 +1199,14 @@
// 3. No combiner plan
// 4. POForEach nested plan only contains POProject in any depth
// 5. Inside POForEach, all occurrences of the last input are
flattened
+
if (mr.mapPlan.isEmpty()) return;
if (mr.reducePlan.isEmpty()) return;
// Check combiner plan
- if (!mr.combinePlan.isEmpty())
+ if (!mr.combinePlan.isEmpty()) {
return;
+ }
// Check map plan
List<PhysicalOperator> mpLeaves = mr.mapPlan.getLeaves();
@@ -1313,44 +1315,50 @@
if (lastInputFlattened && allSimple && projOfLastInput != null)
{
// Now we can optimize the map-reduce plan
-
// Replace POPackage->POForeach to POJoinPackage
- String scope = pack.getOperatorKey().scope;
- NodeIdGenerator nig = NodeIdGenerator.getGenerator();
- POJoinPackage joinPackage;
- joinPackage = new POJoinPackage(
- new OperatorKey(scope,
nig.getNextNodeId(scope)),
- -1, pack, forEach);
- joinPackage.setChunkSize(Long.parseLong(chunkSize));
- PhysicalOperator nextOp = null;
- List<PhysicalOperator> succs =
mr.reducePlan.getSuccessors(forEach);
- if (succs!=null)
- {
- if (succs.size()!=1)
- {
- String msg = new String("forEach can only have one
successor");
- log.error(msg);
- throw new VisitorException(msg);
- }
- nextOp = succs.get(0);
- }
- mr.reducePlan.remove(pack);
-
- try {
- mr.reducePlan.replace(forEach, joinPackage);
- } catch (PlanException e) {
- String msg = new String("Error rewrite POJoinPackage");
- log.error(msg);
- throw new VisitorException(msg, e);
- }
-
- log.info("Rewrite: POPackage->POForEach to POJoinPackage");
+ replaceWithPOJoinPackage(mr.reducePlan, pack, forEach,
chunkSize);
}
}
}
+ public static void replaceWithPOJoinPackage(PhysicalPlan plan,
+ POPackage pack, POForEach forEach, String chunkSize) throws
VisitorException {
+ String scope = pack.getOperatorKey().scope;
+ NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+ POJoinPackage joinPackage;
+ joinPackage = new POJoinPackage(
+ new OperatorKey(scope, nig.getNextNodeId(scope)),
+ -1, pack, forEach);
+ joinPackage.setChunkSize(Long.parseLong(chunkSize));
+ List<PhysicalOperator> succs = plan.getSuccessors(forEach);
+ if (succs!=null)
+ {
+ if (succs.size()!=1)
+ {
+ String msg = new String("forEach can only have one
successor");
+ LogFactory.
+ getLog(LastInputStreamingOptimizer.class).error(msg);
+ throw new VisitorException(msg);
+ }
+ }
+ plan.remove(pack);
+
+ try {
+ plan.replace(forEach, joinPackage);
+ } catch (PlanException e) {
+ String msg = new String("Error rewrite POJoinPackage");
+ LogFactory.
+ getLog(LastInputStreamingOptimizer.class).error(msg);
+ throw new VisitorException(msg, e);
+ }
+
+ LogFactory.
+ getLog(LastInputStreamingOptimizer.class).info("Rewrite:
POPackage->POForEach to POJoinPackage");
+ }
+
}
+
private class RearrangeAdjuster extends MROpPlanVisitor {
RearrangeAdjuster(MROperPlan plan) {
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Mon Nov 10 17:23:00 2008
@@ -33,7 +33,7 @@
import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.CoGroupStreamingOptimizerVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
@@ -133,9 +133,12 @@
comp.randomizeFileLocalizer();
comp.compile();
MROperPlan plan = comp.getMRPlan();
+ String lastInputChunkSize =
+ pc.getProperties().getProperty(
+ "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
String prop = System.getProperty("pig.exec.nocombiner");
if (!("true".equals(prop))) {
- CombinerOptimizer co = new CombinerOptimizer(plan);
+ CombinerOptimizer co = new CombinerOptimizer(plan,
lastInputChunkSize);
co.visit();
}
@@ -148,9 +151,9 @@
checker.visit();
// optimize joins
- CoGroupStreamingOptimizerVisitor cgso = new
MRCompiler.CoGroupStreamingOptimizerVisitor(plan,
- pc.getProperties().getProperty("join.biggest.input.chunksize",
POJoinPackage.DEFAULT_CHUNK_SIZE));
- cgso.visit();
+ LastInputStreamingOptimizer liso =
+ new MRCompiler.LastInputStreamingOptimizer(plan,
lastInputChunkSize);
+ liso.visit();
// figure out the type of the key for the map plan
// this is needed when the key is null to create
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
Mon Nov 10 17:23:00 2008
@@ -39,6 +39,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TargetedTuple;
@@ -58,6 +59,8 @@
Reducer<PigNullableWritable, NullableTuple, PigNullableWritable,
Writable> {
private final Log log = LogFactory.getLog(getClass());
+ private final static Tuple DUMMYTUPLE = null;
+
private byte keyType;
//The reduce plan
@@ -118,17 +121,40 @@
pigReporter.setRep(reporter);
- pack.attachInput(key, tupIter);
+ // In the case we optimize, we combine
+ // POPackage and POForeach - so we could get many
+ // tuples out of the getnext() call of POJoinPackage
+ // In this case, we process till we see EOP from
+ // POJoinPacakage.getNext()
+ if (pack instanceof POJoinPackage)
+ {
+ pack.attachInput(key, tupIter);
+ while (true)
+ {
+ if (processOnePackageOutput(oc))
+ break;
+ }
+ }
+ else {
+ // not optimized, so package will
+ // give only one tuple out for the key
+ pack.attachInput(key, tupIter);
+ processOnePackageOutput(oc);
+ }
+ }
+
+ // return: false-more output
+ // true- end of processing
+ public boolean
processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc)
throws IOException {
try {
- Tuple t=null;
- Result res = pack.getNext(t);
+ Result res = pack.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
if(cp.isEmpty()){
oc.collect(null, packRes);
- return;
+ return false;
}
cp.attachInput(packRes);
@@ -137,7 +163,7 @@
PhysicalOperator leaf = leaves.get(0);
while(true){
- Result redRes = leaf.getNext(t);
+ Result redRes = leaf.getNext(DUMMYTUPLE);
if(redRes.returnStatus==POStatus.STATUS_OK){
Tuple tuple = (Tuple)redRes.result;
@@ -157,7 +183,7 @@
}
if(redRes.returnStatus==POStatus.STATUS_EOP)
- return;
+ break;
if(redRes.returnStatus==POStatus.STATUS_NULL)
continue;
@@ -171,22 +197,27 @@
}
if(res.returnStatus==POStatus.STATUS_NULL)
- return;
+ return false;
if(res.returnStatus==POStatus.STATUS_ERR){
IOException ioe = new IOException("Packaging error while
processing group");
throw ioe;
}
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return true;
+ }
+ return false;
} catch (ExecException e) {
IOException ioe = new IOException(e.getMessage());
ioe.initCause(e.getCause());
throw ioe;
}
+
}
-
/**
* Will be called once all the intermediate keys and values are
* processed. So right place to stop the reporter thread.
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Mon Nov 10 17:23:00 2008
@@ -77,6 +77,7 @@
public class PigMapReduce {
public static JobConf sJobConf = null;
+ private final static Tuple DUMMYTUPLE = null;
public static class Map extends PigMapBase implements
Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
@@ -229,8 +230,7 @@
public boolean
processOnePackageOutput(OutputCollector<PigNullableWritable, Writable> oc)
throws IOException
{
try {
- Tuple t=null;
- Result res = pack.getNext(t);
+ Result res = pack.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
@@ -277,8 +277,7 @@
protected void runPipeline(PhysicalOperator leaf) throws
ExecException, IOException {
while(true)
{
- Tuple dummyTuple = null;
- Result redRes = leaf.getNext(dummyTuple);
+ Result redRes = leaf.getNext(DUMMYTUPLE);
if(redRes.returnStatus==POStatus.STATUS_OK){
outputCollector.collect(null, (Tuple)redRes.result);
continue;
@@ -406,8 +405,7 @@
pack.attachInput(key, tupIter);
try {
- Tuple t=null;
- Result res = pack.getNext(t);
+ Result res = pack.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=712900&r1=712899&r2=712900&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
Mon Nov 10 17:23:00 2008
@@ -26,6 +26,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
@@ -54,7 +55,7 @@
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
- // POPackage could be present in the combine plan
+ // POPackage OR POJoinPackage could be present in the combine plan
// OR in the reduce plan. POPostCombinerPackage could
// be present only in the reduce plan. Search in these two
// plans accordingly
@@ -139,6 +140,15 @@
};
/* (non-Javadoc)
+ * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
+ */
+ @Override
+ public void visitJoinPackage(POJoinPackage joinPackage)
+ throws VisitorException {
+ this.pkg = joinPackage;
+ }
+
+ /* (non-Javadoc)
* @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
*/
@Override