Author: pradeepkth
Date: Fri Aug 14 17:53:23 2009
New Revision: 804309
URL: http://svn.apache.org/viewvc?rev=804309&view=rev
Log:
PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 14 17:53:23 2009
@@ -25,6 +25,9 @@
PIG-734: Changed maps to only take strings as keys (gates).
IMPROVEMENTS
+
+PIG-845: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)
+
PIG-893: Added string -> integer, long, float, and double casts (zjffdu via
gates).
PIG-833: Added Zebra, new columnar storage mechanism for HDFS (rangadi plus
many others via gates)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Aug 14 17:53:23 2009
@@ -110,6 +110,8 @@
public static final String LOG_DIR = "_logs";
+ public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
+
// A mapping of job to pair of store locations and tmp locations for that
job
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
@@ -442,11 +444,11 @@
jobConf.setMapperClass(PigMapOnly.Map.class);
jobConf.setNumReduceTasks(0);
jobConf.set("pig.mapPlan",
ObjectSerializer.serialize(mro.mapPlan));
- if(mro.isStreamInMap()) {
+ if(mro.isEndOfAllInputSetInMap()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
- // The pipeline is rerun only if there was a stream
- jobConf.set("pig.stream.in.map", "true");
+ // The pipeline is rerun if there either was a stream or
POMergeJoin
+ jobConf.set(END_OF_INP_IN_MAP, "true");
}
}
else{
@@ -470,14 +472,14 @@
jobConf.setNumReduceTasks(mro.requestedParallelism);
jobConf.set("pig.mapPlan",
ObjectSerializer.serialize(mro.mapPlan));
- if(mro.isStreamInMap()) {
+ if(mro.isEndOfAllInputSetInMap()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
- // The pipeline is rerun only if there was a stream
- jobConf.set("pig.stream.in.map", "true");
+ // The pipeline is rerun only if there was a stream or
merge-join.
+ jobConf.set(END_OF_INP_IN_MAP, "true");
}
jobConf.set("pig.reducePlan",
ObjectSerializer.serialize(mro.reducePlan));
- if(mro.isStreamInReduce()) {
+ if(mro.isEndOfAllInputSetInReduce()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
// The pipeline is rerun only if there was a stream
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Aug 14 17:53:23 2009
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +38,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.MergeJoinIndexer;
import org.apache.pig.impl.builtin.TupleSize;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -63,6 +65,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -82,6 +85,7 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -996,6 +1000,239 @@
}
}
+ /** Since merge-join works on two inputs there are exactly two MROper
predecessors identified as left and right.
+ * Instead of merging two operators, both are used to generate a MR job
each. First MR oper is run to generate on-the-fly index on right side.
+ * Second is used to actually do the join. First MR oper is identified as
rightMROper and second as curMROper.
+
+ * 1) RightMROper: If it is in map phase. It can be preceded only by
POLoad. If there is anything else
+ * in physical plan, that is yanked and set as inner
plans of joinOp.
+ * If it is reduce phase. Close this operator and start
new MROper.
+ * 2) LeftMROper: If it is in map phase, add the Join operator in it.
+ * If it is in reduce phase. Close it and start new
MROper.
+ */
+
+ @Override
+ public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+
+ try{
+ if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2)
+ throw new MRCompilerException("Merge Join must have exactly
two inputs. Found : "+compiledInputs.length, 1101);
+
+ OperatorKey leftPhyOpKey =
joinOp.getInputs().get(0).getOperatorKey();
+ OperatorKey rightPhyOpKey =
joinOp.getInputs().get(1).getOperatorKey();
+
+ // Currently we assume that physical operator succeeding
POMergeJoin in the physical plan is present in MROperators found in
compiledInputs[].
+ // This may not always hold. e.g., if there is an order-by before
merge join.
+
+
if(compiledInputs[0].mapPlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey)
||
compiledInputs[0].reducePlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey))
+ curMROp = compiledInputs[0];
+
+ else
if(compiledInputs[1].mapPlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey)
||
compiledInputs[1].reducePlan.getLeaves().get(0).getOperatorKey().equals(leftPhyOpKey))
+ curMROp = compiledInputs[1];
+
+ else{ // This implies predecessor of left input is not found in
compiled Inputs.
+ int errCode = 2169;
+ String errMsg = "Physical operator preceding left predicate
not found in compiled MR jobs.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ MapReduceOper rightMROpr = null;
+
if(compiledInputs[1].mapPlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey)
||
compiledInputs[1].reducePlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey))
+ rightMROpr = compiledInputs[1];
+
+ else
if(compiledInputs[0].mapPlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey)
||
compiledInputs[0].reducePlan.getLeaves().get(0).getOperatorKey().equals(rightPhyOpKey))
+ rightMROpr = compiledInputs[0];
+
+ else{ // This implies predecessor of right input is not found in
compiled Inputs.
+ int errCode = 2169;
+ String errMsg = "Physical operator preceding right predicate
not found in compiled MR jobs.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ if(curMROp == null || rightMROpr == null){
+
+ // This implies either of compiledInputs[0] or
compiledInputs[1] is null.
+ int errCode = 2173;
+ String errMsg = "One of the preceding compiled MR operator is
null. This is not expected.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ if(curMROp.equals(rightMROpr)){
+ int errCode = 2170;
+ String errMsg = "Physical operator preceding both left and
right predicate found to be same. This is not expected.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ // We will first operate on right side which is indexer job.
+ // First yank plan of the compiled right input and set that as an
inner plan of right operator.
+ if(!rightMROpr.mapDone){
+ PhysicalPlan rightMapPlan = rightMROpr.mapPlan;
+ if(rightMapPlan.getRoots().size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root
physical operator in physical plan.";
+ throw new
MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
+ if(! (rightLoader instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be
POLoad. Found : "+rightLoader.getClass().getCanonicalName();
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ if (rightMapPlan.getSuccessors(rightLoader) == null ||
rightMapPlan.getSuccessors(rightLoader).isEmpty())
+ // Load - Join case.
+ joinOp.setupRightPipeline(null);
+
+ else{ // We got something on right side. Yank it and set it as
inner plan of right input.
+ PhysicalPlan rightPipelinePlan = rightMapPlan.clone();
+ PhysicalOperator root =
rightPipelinePlan.getRoots().get(0);
+ rightPipelinePlan.disconnect(root,
rightPipelinePlan.getSuccessors(root).get(0));
+ rightPipelinePlan.remove(root);
+ joinOp.setupRightPipeline(rightPipelinePlan);
+ rightMapPlan.trimBelow(rightLoader);
+ }
+ }
+
+ else if(!rightMROpr.reduceDone){
+ // Indexer must run in map. If we are in reduce, close it and
start new MROper.
+ // No need of yanking in this case. Since we are starting
brand new MR Operator and it will contain nothing.
+ joinOp.setupRightPipeline(null);
+ POStore rightStore = getStore();
+ FileSpec rightStrFile = getTempFileSpec();
+ rightStore.setSFile(rightStrFile);
+ rightMROpr.setReduceDone(true);
+ rightMROpr = startNew(rightStrFile, rightMROpr);
+ }
+
+ else{
+ int errCode = 2022;
+ String msg = "Both map and reduce phases have been done. This
is unexpected while compiling.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ // At this point, we must be operating on map plan of right input
and it would contain nothing else other then a POLoad.
+ POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
+
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+
+ // Replace POLoad with indexer.
+ String[] indexerArgs = new String[2];
+ indexerArgs[0] = rightLoader.getLFile().getFuncName();
+ List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+ indexerArgs[1] =
ObjectSerializer.serialize((Serializable)rightInpPlans);
+ FileSpec lFile = new
FileSpec(rightLoader.getLFile().getFileName(),new
FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+ rightLoader.setLFile(lFile);
+
+ // Loader of mro will return a tuple of form (key1, key2,
..,filename, offset)
+ // Now set up a POLocalRearrange which has "all" as the key and
tuple fetched
+ // by loader as the "value" of POLocalRearrange
+ // Sorting of index can possibly be achieved by using Hadoop
sorting between map and reduce instead of Pig doing sort. If that is so,
+ // it will simplify lot of the code below.
+
+ PhysicalPlan lrPP = new PhysicalPlan();
+ ConstantExpression ce = new ConstantExpression(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ ce.setValue("all");
+ ce.setResultType(DataType.CHARARRAY);
+ lrPP.add(ce);
+
+ List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
+ lrInnerPlans.add(lrPP);
+
+ POLocalRearrange lr = new POLocalRearrange(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ lr.setIndex(0);
+ lr.setKeyType(DataType.CHARARRAY);
+ lr.setPlans(lrInnerPlans);
+ lr.setResultType(DataType.TUPLE);
+ rightMROpr.mapPlan.addAsLeaf(lr);
+
+ rightMROpr.setMapDone(true);
+
+ // On the reduce side of this indexing job, there will be a global
rearrange followed by POSort.
+ // Output of POSort will be index file dumped on the DFS.
+
+ // First add POPackage.
+ POPackage pkg = new POPackage(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType(DataType.CHARARRAY);
+ pkg.setNumInps(1);
+ pkg.setInner(new boolean[]{false});
+ rightMROpr.reducePlan.add(pkg);
+
+ // Next project tuples from the bag created by POPackage.
+ POProject topPrj = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ topPrj.setColumn(1);
+ topPrj.setResultType(DataType.TUPLE);
+ topPrj.setOverloaded(true);
+ rightMROpr.reducePlan.add(topPrj);
+ rightMROpr.reducePlan.connect(pkg, topPrj);
+
+ // Now create and add POSort. Sort plan is project *.
+ List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
+ PhysicalPlan innerSortPlan = new PhysicalPlan();
+ POProject prj = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ innerSortPlan.add(prj);
+ sortPlans.add(innerSortPlan);
+
+ // Currently we assume all columns are in asc order.
+ // Add two because filename and offset are added by Indexer in
addition to keys.
+ List<Boolean> mAscCols = new
ArrayList<Boolean>(rightInpPlans.size()+2);
+ for(int i=0; i< rightInpPlans.size()+2; i++)
+ mAscCols.add(true);
+
+ POSort sortOp = new POSort(new
OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
+ rightMROpr.reducePlan.add(sortOp);
+ rightMROpr.reducePlan.connect(topPrj, sortOp);
+
+ POStore st = getStore();
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ rightMROpr.reducePlan.addAsLeaf(st);
+ rightMROpr.setReduceDone(true);
+
+ joinOp.setIndexFile(strFile);
+
+ // We are done with right side. Lets work on left now.
+ // Join will be materialized in leftMROper.
+ if(!curMROp.mapDone) // Life is easy
+ curMROp.mapPlan.addAsLeaf(joinOp);
+
+ else if(!curMROp.reduceDone){ // This is a map-side join. Close
this MROper and start afresh.
+ POStore leftStore = getStore();
+ FileSpec leftStrFile = getTempFileSpec();
+ leftStore.setSFile(leftStrFile);
+ curMROp.setReduceDone(true);
+ curMROp = startNew(leftStrFile, curMROp);
+ curMROp.mapPlan.addAsLeaf(joinOp);
+ }
+
+ else{
+ int errCode = 2022;
+ String msg = "Both map and reduce phases have been done. This
is unexpected while compiling.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ // We want to ensure indexing job runs prior to actual join job.
So, connect them in order.
+ MRPlan.connect(rightMROpr, curMROp);
+ }
+ catch(PlanException e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " +
joinOp.getClass().getCanonicalName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ catch (IOException e){
+ int errCode = 3000;
+ String errMsg = "IOException caught while compiling POMergeJoin";
+ throw new MRCompilerException(errMsg, errCode,e);
+ }
+ catch(CloneNotSupportedException e){
+ int errCode = 2127;
+ String errMsg = "Cloning exception caught while compiling
POMergeJoin";
+ throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+ }
+ }
+
@Override
public void visitDistinct(PODistinct op) throws VisitorException {
try{
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Aug 14 17:53:23 2009
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
@@ -43,10 +42,10 @@
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
@@ -54,7 +53,6 @@
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
@@ -361,7 +359,7 @@
// check whether stream operator is present
// after MultiQueryOptimizer because it can shift streams from
// map to reduce, etc.
- MRStreamHandler checker = new MRStreamHandler(plan);
+ EndOfAllInputSetter checker = new EndOfAllInputSetter(plan);
checker.visit();
return plan;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Fri Aug 14 17:53:23 2009
@@ -68,13 +68,13 @@
//is complete
boolean reduceDone = false;
- // Indicates that there is POStream in the
+ // Indicates that there is an operator which uses endOfAllInput flag in
the
// map plan
- boolean streamInMap = false;
+ boolean endOfAllInputInMap = false;
- // Indicates that there is POStream in the
+ // Indicates that there is an operator which uses endOfAllInput flag in
the
// reduce plan
- boolean streamInReduce = false;
+ boolean endOfAllInputInReduce = false;;
//Indicates if this job is an order by job
boolean globalSort = false;
@@ -306,33 +306,32 @@
}
/**
- * @return whether there is a POStream in the map plan
+ * @return whether end of all input is set in the map plan
*/
- public boolean isStreamInMap() {
- return streamInMap;
+ public boolean isEndOfAllInputSetInMap() {
+ return endOfAllInputInMap;
}
/**
- * @param streamInMap the streamInMap to set
+ * @param endOfAllInputInMap the streamInMap to set
*/
- public void setStreamInMap(boolean streamInMap) {
- this.streamInMap = streamInMap;
+ public void setEndOfAllInputInMap(boolean endOfAllInputInMap) {
+ this.endOfAllInputInMap = endOfAllInputInMap;
}
/**
- * @return whether there is a POStream in the reduce plan
+ * @return whether end of all input is set in the reduce plan
*/
- public boolean isStreamInReduce() {
- return streamInReduce;
+ public boolean isEndOfAllInputSetInReduce() {
+ return endOfAllInputInReduce;
}
/**
- * @param streamInReduce the streamInReduce to set
+ * @param endOfAllInputInReduce the streamInReduce to set
*/
- public void setStreamInReduce(boolean streamInReduce) {
- this.streamInReduce = streamInReduce;
- }
-
+ public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
+ this.endOfAllInputInReduce = endOfAllInputInReduce;
+ }
public int getFragment() {
return fragment;
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Fri Aug 14 17:53:23 2009
@@ -970,12 +970,12 @@
private void mergeMROperProperties(MapReduceOper from, MapReduceOper to) {
- if (from.isStreamInMap()) {
- to.setStreamInMap(true);
+ if (from.isEndOfAllInputSetInMap()) {
+ to.setEndOfAllInputInMap(true);
}
- if (from.isStreamInReduce()) {
- to.setStreamInReduce(true);
+ if (from.isEndOfAllInputSetInReduce()) {
+ to.setEndOfAllInputInReduce(true);
}
if (from.getRequestedParallelism() > to.getRequestedParallelism()) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
Fri Aug 14 17:53:23 2009
@@ -252,6 +252,11 @@
}
@Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ join.setParentPlan(parent);
+ }
+
+ @Override
public void visitSkewedJoin(POSkewedJoin join) throws VisitorException {
join.setParentPlan(parent);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Fri Aug 14 17:53:23 2009
@@ -33,7 +33,6 @@
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -45,7 +44,6 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -96,13 +94,13 @@
return;
}
- if(PigMapReduce.sJobConf.get("pig.stream.in.map",
"false").equals("true")) {
- // If there is a stream in the pipeline we could
+ if(PigMapReduce.sJobConf.get(JobControlCompiler.END_OF_INP_IN_MAP,
"false").equals("true")) {
+ // If there is a stream in the pipeline or if this map job belongs
to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
// already and then lets run the pipeline one more time
// This will result in nothing happening in the case
- // where there is no stream in the pipeline
+ // where there is no stream or it is not a merge-join in the
pipeline
mp.endOfAllInput = true;
try {
runPipeline(leaf);
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=804309&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
Fri Aug 14 17:53:23 2009
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper: If the map plan or the reduce plan of the MROper has
+ * an end of all input flag present in it, this marks in the MROper whether
the map
+ * has an end of all input flag set or if the reduce has an end of all input
flag set.
+ *
+ */
+public class EndOfAllInputSetter extends MROpPlanVisitor {
+
+ /**
+ * @param plan MR plan to visit
+ */
+ public EndOfAllInputSetter(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+
+ EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
+ checker.visit();
+ if(checker.isEndOfAllInputPresent()) {
+ mr.setEndOfAllInputInMap(true);
+ }
+
+ checker = new EndOfAllInputChecker(mr.reducePlan);
+ checker.visit();
+ if(checker.isEndOfAllInputPresent()) {
+ mr.setEndOfAllInputInReduce(true);
+ }
+
+ }
+
+ static class EndOfAllInputChecker extends PhyPlanVisitor {
+
+ private boolean endOfAllInputFlag = false;
+ public EndOfAllInputChecker(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(plan));
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+ */
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ // stream present
+ endOfAllInputFlag = true;
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ // merge join present
+ endOfAllInputFlag = true;
+ }
+ /**
+ * @return if end of all input is present
+ */
+ public boolean isEndOfAllInputPresent() {
+ return endOfAllInputFlag;
+ }
+ }
+}
\ No newline at end of file
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
Fri Aug 14 17:53:23 2009
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
-
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.VisitorException;
-
-/**
- * This visitor visits the MRPlan and does the following
- * for each MROper
- * - If the map plan or the reduce plan of the MROper has
- * a POStream in it, this marks in the MROper whether the map
- * has a POStream or if the reduce has a POStream.
- *
- */
-public class MRStreamHandler extends MROpPlanVisitor {
-
- /**
- * @param plan MR plan to visit
- */
- public MRStreamHandler(MROperPlan plan) {
- super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
- }
-
- @Override
- public void visitMROp(MapReduceOper mr) throws VisitorException {
-
- StreamChecker checker = new StreamChecker(mr.mapPlan);
- checker.visit();
- if(checker.isStreamPresent()) {
- mr.setStreamInMap(true);
- }
-
- checker = new StreamChecker(mr.reducePlan);
- checker.visit();
- if(checker.isStreamPresent()) {
- mr.setStreamInReduce(true);
- }
-
- }
-
- class StreamChecker extends PhyPlanVisitor {
-
- private boolean streamPresent = false;
- public StreamChecker(PhysicalPlan plan) {
- super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(plan));
- }
-
- /* (non-Javadoc)
- * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
- @Override
- public void visitStream(POStream stream) throws VisitorException {
- // stream present
- streamPresent = true;
- }
-
- /**
- * @return if stream is present
- */
- public boolean isStreamPresent() {
- return streamPresent;
- }
- }
-}
-
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
Fri Aug 14 17:53:23 2009
@@ -56,6 +56,7 @@
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LinkedMultiMap;
import org.apache.pig.impl.util.MultiMap;
public class LogToPhyTranslationVisitor extends LOVisitor {
@@ -723,20 +724,30 @@
LogToPhyMap.put(cg, poPackage);
}
- /**
- * Create the inner plans used to configure the Partition rearrange
operators
- */
@Override
protected void visit(LOJoin loj) throws VisitorException {
- List<LogicalOperator> inputs = loj.getInputs();
- MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new
MultiMap<PhysicalOperator, PhysicalPlan>();
+ String scope = loj.getOperatorKey().scope;
+
+ // List of join predicates
+ List<LogicalOperator> inputs = loj.getInputs();
+
+ // mapping of inner join physical plans corresponding to inner
physical operators.
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new
LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
+
+ // Outer list corresponds to join predicates. Inner list is inner
physical plan of each predicate.
+ List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+
+ // List of physical operator corresponding to join predicates.
List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+
+ // Outer list corresponds to join predicates and inner list
corresponds to type of keys for each predicate.
+ List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
+
for (LogicalOperator op : inputs) {
PhysicalOperator physOp = LogToPhyMap.get(op);
inp.add(physOp);
List<LogicalPlan> plans = (List<LogicalPlan>)
loj.getJoinPlans().get(op);
-
List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
currentPlans.push(currentPlan);
for (LogicalPlan lp : plans) {
@@ -744,17 +755,23 @@
PlanWalker<LogicalOperator, LogicalPlan> childWalker =
mCurrentWalker.spawnChildWalker(lp);
pushWalker(childWalker);
mCurrentWalker.walk(this);
- exprPlans.add((PhysicalPlan) currentPlan);
+ exprPlans.add(currentPlan);
popWalker();
}
currentPlan = currentPlans.pop();
- joinPlans.put(physOp, exprPlans);
+ ppLists.add(exprPlans);
+ joinPlans.put(physOp, exprPlans);
+
+ // Key could potentially be a tuple. So, we visit all exprPlans to
get types of members of tuples.
+ List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
+ for(PhysicalPlan exprPlan : exprPlans)
+
tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
+ keyTypes.add(tupleKeyMemberTypes);
}
if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
POSkewedJoin skj;
try {
- String scope = loj.getOperatorKey().scope;
skj = new POSkewedJoin(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
inp);
skj.setJoinPlans(joinPlans);
@@ -777,40 +794,11 @@
}
}
LogToPhyMap.put(loj, skj);
- } else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
- String scope = loj.getOperatorKey().scope;
- inputs = loj.getInputs();
- List<List<PhysicalPlan>> ppLists = new
ArrayList<List<PhysicalPlan>>();
- List<Byte> keyTypes = new ArrayList<Byte>();
+ }
+
+ else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
int fragment = 0;
- inp = new ArrayList<PhysicalOperator>();
- for (LogicalOperator op : inputs) {
- inp.add(LogToPhyMap.get(op));
- List<LogicalPlan> plans = (List<LogicalPlan>)
loj.getJoinPlans()
- .get(op);
-
- List<PhysicalPlan> exprPlans = new
ArrayList<PhysicalPlan>();
- currentPlans.push(currentPlan);
- for (LogicalPlan lp : plans) {
- currentPlan = new PhysicalPlan();
- PlanWalker<LogicalOperator, LogicalPlan> childWalker =
mCurrentWalker
- .spawnChildWalker(lp);
- pushWalker(childWalker);
- mCurrentWalker.walk(this);
- exprPlans.add((PhysicalPlan) currentPlan);
- popWalker();
-
- }
- currentPlan = currentPlans.pop();
- ppLists.add(exprPlans);
-
- if (plans.size() > 1) {
- keyTypes.add(DataType.TUPLE);
- } else {
-
keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
- }
- }
POFRJoin pfrj;
try {
pfrj = new POFRJoin(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
@@ -832,10 +820,36 @@
}
}
LogToPhyMap.put(loj, pfrj);
- } else if(loj.getJoinType() == LOJoin.JOINTYPE.REGULAR) {
- String scope = loj.getOperatorKey().scope;
- inputs = loj.getInputs();
-
+ }
+
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE &&
validateMergeJoin(loj)) {
+
+ POMergeJoin smj;
+ try {
+ smj = new POMergeJoin(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),inp,joinPlans,keyTypes);
+ }
+ catch (Exception e) {
+ int errCode = 2042;
+ String msg = "Merge Join creation failed";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.BUG, e);
+ }
+
+ smj.setResultType(DataType.TUPLE);
+ currentPlan.add(smj);
+
+ for (LogicalOperator op : inputs) {
+ try {
+ currentPlan.connect(LogToPhyMap.get(op), smj);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical
plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, e);
+ }
+ }
+ LogToPhyMap.put(loj, smj);
+ return;
+ }
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.REGULAR){
POGlobalRearrange poGlobal = new POGlobalRearrange(new
OperatorKey(
scope, nodeGen.getNextNodeId(scope)), loj
.getRequestedParallelism());
@@ -850,8 +864,6 @@
try {
currentPlan.connect(poGlobal, poPackage);
- List<Boolean> flattenLst = Arrays.asList(true, true);
-
for (LogicalOperator op : inputs) {
List<LogicalPlan> plans = (List<LogicalPlan>)
loj.getJoinPlans()
.get(op);
@@ -944,127 +956,39 @@
String msg = "Invalid physical operators in the physical
plan" ;
throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, e1);
}
- LogToPhyMap.put(loj, fe);
+ LogToPhyMap.put(loj, fe);
}
}
-
- /**
- * Add a local rearrange operator to the plan
- */
-/*
- private void addLocalRearrange(LogicalOperator lo, PhysicalPlan &plan,
List<PhysicalPlan> &exprPlans) throws VisitorException {
- String scope = lo.getOperatorKey().scope;
- POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
- scope, nodeGen.getNextNodeId(scope)),
lo.getRequestedParallelism());
-
- try {
- physOp.setPlans(exprPlans);
- } catch (PlanException pe) {
- int errCode = 2071;
- String msg = "Problem with setting up local rearrange's
plans.";
- throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, pe);
- }
- try {
- physOp.setIndex(count++);
- } catch (ExecException e1) {
- int errCode = 2058;
- String msg = "Unable to set index on newly create
POLocalRearrange.";
- throw new VisitorException(msg, errCode,
PigException.BUG, e1);
- }
- if (plans.size() > 1) {
- type = DataType.TUPLE;
- physOp.setKeyType(type);
- } else {
- type =
exprPlans.get(0).getLeaves().get(0).getResultType();
- physOp.setKeyType(type);
- }
- physOp.setResultType(DataType.TUPLE);
-
- currentPlan.add(physOp);
-
- try {
- currentPlan.connect(LogToPhyMap.get(op), physOp);
- currentPlan.connect(physOp, poGlobal);
- } catch (PlanException e) {
- int errCode = 2015;
- String msg = "Invalid physical operators in the
physical plan" ;
- throw new LogicalToPhysicalTranslatorException(msg,
errCode, PigException.BUG, e);
- }
- }
- */
- /**
- * Create the inner plans used to configure the Local Rearrange
operators(ppLists)
- * Extract the keytypes and create the POFRJoin operator.
- */
-/* @Override
- protected void visit(LOFRJoin frj) throws VisitorException {
- String scope = frj.getOperatorKey().scope;
- List<LogicalOperator> inputs = frj.getInputs();
- List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
- List<Byte> keyTypes = new ArrayList<Byte>();
+ private boolean validateMergeJoin(LOJoin loj) throws
LogicalToPhysicalTranslatorException{
+
+ List<LogicalOperator> preds = loj.getInputs();
+
+ int errCode = 1101;
+ String errMsg = "Merge Join must have exactly two inputs.";
+ if(preds.size() != 2)
+ throw new LogicalToPhysicalTranslatorException(errMsg+" Found:
"+preds.size(),errCode);
- int fragment = findFrag(inputs,frj.getFragOp());
- List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
- for (LogicalOperator op : inputs) {
- inp.add(LogToPhyMap.get(op));
- List<LogicalPlan> plans = (List<LogicalPlan>) frj.getJoinColPlans()
- .get(op);
-
- List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
- currentPlans.push(currentPlan);
- for (LogicalPlan lp : plans) {
- currentPlan = new PhysicalPlan();
- PlanWalker<LogicalOperator, LogicalPlan> childWalker =
mCurrentWalker
- .spawnChildWalker(lp);
- pushWalker(childWalker);
- mCurrentWalker.walk(this);
- exprPlans.add((PhysicalPlan) currentPlan);
- popWalker();
-
- }
- currentPlan = currentPlans.pop();
- ppLists.add(exprPlans);
-
- if (plans.size() > 1) {
- keyTypes.add(DataType.TUPLE);
- } else {
-
keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
- }
- }
- POFRJoin pfrj;
- try {
- pfrj = new POFRJoin(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
- inp, ppLists, keyTypes, null,
fragment);
- } catch (ExecException e1) {
- int errCode = 2058;
- String msg = "Unable to set index on newly create
POLocalRearrange.";
- throw new VisitorException(msg, errCode, PigException.BUG, e1);
- }
- pfrj.setResultType(DataType.TUPLE);
- currentPlan.add(pfrj);
- for (LogicalOperator op : inputs) {
- try {
- currentPlan.connect(LogToPhyMap.get(op), pfrj);
- } catch (PlanException e) {
- int errCode = 2015;
- String msg = "Invalid physical operators in the physical plan"
;
- throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.BUG, e);
- }
- }
- LogToPhyMap.put(frj, pfrj);
- }
-*/
- private int findFrag(List<LogicalOperator> inputs, LogicalOperator fragOp)
{
- int i=-1;
- for (LogicalOperator lop : inputs) {
- if(fragOp.getOperatorKey().equals(lop.getOperatorKey()))
- return ++i;
- }
- return -1;
+ return mergeJoinValidator(preds,loj.getPlan());
+ }
+
+ private boolean mergeJoinValidator(List<LogicalOperator>
preds,LogicalPlan lp) throws LogicalToPhysicalTranslatorException{
+
+ int errCode = 1103;
+ String errMsg = "Merge join only supports Filter, Foreach, filter
and Load as its predecessor. Found : ";
+ if(preds != null && !preds.isEmpty()){
+ for(LogicalOperator lo : preds){
+ if (!(lo instanceof LOFilter || lo instanceof LOForEach ||
lo instanceof LOLoad))
+ throw new LogicalToPhysicalTranslatorException(errMsg,
errCode);
+ // All is good at this level. Visit predecessors now.
+ mergeJoinValidator(lp.getPredecessors(lo),lp);
+ }
+ }
+ // We visited everything and all is good.
+ return true;
}
-
- @Override
+
+ @Override
public void visit(LOFilter filter) throws VisitorException {
String scope = filter.getOperatorKey().scope;
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
Fri Aug 14 17:53:23 2009
@@ -243,7 +243,10 @@
public void visitFRJoin(POFRJoin join) throws VisitorException {
//do nothing
}
-
+
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ //do nothing
+ }
/**
* @param stream
* @throws VisitorException
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
Fri Aug 14 17:53:23 2009
@@ -39,7 +39,6 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -73,7 +72,7 @@
//Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
private List<List<PhysicalPlan>> phyPlanLists;
//The key type for each Local Rearrange operator
- private List<Byte> keyTypes;
+ private List<List<Byte>> keyTypes;
//The Local Rearrange operators modeling the join key
private POLocalRearrange[] LRs;
//The set of files that represent the replicated inputs
@@ -109,7 +108,7 @@
this(k,rp,inp,null, null, null, -1);
}
- public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles,
int fragment) throws ExecException{
+ public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[]
replFiles, int fragment) throws ExecException{
super(k,rp,inp);
phyPlanLists = ppLists;
@@ -148,7 +147,7 @@
POLocalRearrange lr = new POLocalRearrange(genKey(old));
lr.setIndex(i);
lr.setResultType(DataType.TUPLE);
- lr.setKeyType(keyTypes.get(i));
+ lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE :
keyTypes.get(i).get(0));
try {
lr.setPlans(ppLst);
} catch (PlanException pe) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
Fri Aug 14 17:53:23 2009
@@ -64,6 +64,8 @@
boolean setUpDone = false;
// Indicates whether the filespec is splittable
boolean splittable = true;
+ // default offset.
+ private long offset = 0;
private final Log log = LogFactory.getLog(getClass());
@@ -76,6 +78,11 @@
this(k,-1,lFile, splittable);
}
+ public POLoad(OperatorKey k, FileSpec lFile, long offset, boolean
splittable){
+ this(k,-1,lFile, splittable);
+ this.offset = offset;
+ }
+
public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
super(k, rp);
this.lFile = lFile;
@@ -86,7 +93,7 @@
* Set up the loader by
* 1) Instantiating the load func
* 2) Opening an input stream to the specified file and
- * 3) Binding to the input stream
+ * 3) Binding to the input stream at the specified offset.
* @throws IOException
*/
public void setUp() throws IOException{
@@ -95,7 +102,7 @@
is = FileLocalizer.open(filename, pc);
- loader.bindTo(filename , new BufferedPositionedInputStream(is), 0,
Long.MAX_VALUE);
+ loader.bindTo(filename , new BufferedPositionedInputStream(is),
this.offset, Long.MAX_VALUE);
}
/**