Author: pradeepkth
Date: Wed Mar 11 23:03:46 2009
New Revision: 752683
URL: http://svn.apache.org/viewvc?rev=752683&view=rev
Log:
PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Mar 11 23:03:46 2009
@@ -410,3 +410,5 @@
PIG-627: multiquery support M1 (hagleitn via olgan)
PIG-627: multiquery support M2 (hagleitn via pradeepkth)
+
+ PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Wed Mar 11
23:03:46 2009
@@ -28,10 +28,12 @@
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -49,6 +51,9 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -62,6 +67,7 @@
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.LOStore;
@@ -838,9 +844,11 @@
private Map<String, LogicalOperator> aliasOp = new HashMap<String,
LogicalOperator>();
- private ArrayList<String> scriptCache = new ArrayList<String>();
+ private List<String> scriptCache = new ArrayList<String>();
- private Map<LogicalOperator, LogicalPlan> storeOpTable = new
HashMap<LogicalOperator, LogicalPlan>();
+ private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore,
LogicalPlan>();
+
+ private Set<LOLoad> loadOps = new HashSet<LOLoad>();
private String jobName;
@@ -866,9 +874,7 @@
Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
- ArrayList<String> getScriptCache() { return scriptCache; }
-
- Map<LogicalOperator, LogicalPlan> getStoreOpTable() { return
storeOpTable; }
+ List<String> getScriptCache() { return scriptCache; }
boolean isBatchOn() { return batchMode; };
@@ -922,8 +928,15 @@
}
} else {
if (0 == ignoreNumStores) {
- storeOpTable.put(op, tmpLp);
+ storeOpTable.put((LOStore)op, tmpLp);
lp.mergeSharedPlan(tmpLp);
+ List<LogicalOperator> roots = tmpLp.getRoots();
+ for (LogicalOperator root : roots) {
+ if (root instanceof LOLoad) {
+ loadOps.add((LOLoad)root);
+ }
+ }
+
} else {
--ignoreNumStores;
}
@@ -987,10 +1000,51 @@
graph.lp = graph.parseQuery(it.next(), lineNumber);
}
}
+ graph.postProcess();
} catch (IOException ioe) {
graph = null;
- }
+ }
return graph;
}
+
+ private void postProcess() throws IOException {
+
+ // The following code deals with store/load combination of
+ // intermediate files. In this case we replace the load operator
+ // with a (implicit) split operator.
+ for (LOLoad load : loadOps) {
+ for (LOStore store : storeOpTable.keySet()) {
+ String ifile = load.getInputFile().getFileName();
+ String ofile = store.getOutputFile().getFileName();
+ if (ofile.compareTo(ifile) == 0) {
+ LogicalOperator storePred =
lp.getPredecessors(store).get(0);
+
+ lp.disconnect(store, load);
+ lp.replace(load, storePred);
+
+ List<LogicalOperator> succs =
lp.getSuccessors(storePred);
+
+ for (LogicalOperator succ : succs) {
+ MultiMap<LogicalOperator, LogicalPlan> innerPls =
null;
+
+ // fix inner plans for cogroup and frjoin operators
+ if (succ instanceof LOCogroup) {
+ innerPls = ((LOCogroup)succ).getGroupByPlans();
+ } else if (succ instanceof LOFRJoin) {
+ innerPls = ((LOFRJoin)succ).getJoinColPlans();
+ }
+
+ if (innerPls != null) {
+ if (innerPls.containsKey(load)) {
+ Collection<LogicalPlan> pls =
innerPls.get(load);
+ innerPls.removeKey(load);
+ innerPls.put(storePred, pls);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Wed Mar 11 23:03:46 2009
@@ -64,6 +64,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -239,6 +240,9 @@
la.visit();
la.adjust();
+ MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan);
+ ma.visit();
+
return MRPlan;
}
@@ -305,6 +309,11 @@
return st;
}
+ private POSplit getSplit(){
+ POSplit sp = new POSplit(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ return sp;
+ }
+
/**
* A map MROper is an MROper whose map plan is still open
* for taking more non-blocking operators.
@@ -491,7 +500,7 @@
MRPlan.connect(old, ret);
return ret;
}
-
+
/**
* Returns a temporary DFS Path
* @return
@@ -1655,5 +1664,258 @@
keyType = p.getResultType();
}
}
-
+
+ /**
+ * An adjuster that merges all or part splitee MapReduceOpers into
+ * splitter MapReduceOper. The merge can produce a MROperPlan that has
+ * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
+ *
+ * The MRCompler generates multiple MapReduceOpers whenever it encounters
+ * a split operator and connects the single splitter MapReduceOper to
+ * one or more splitee MapReduceOpers using store/load operators:
+ *
+ * ---- POStore (in splitter) -... ----
+ * | | ... |
+ * | | ... |
+ * POLoad POLoad ... POLoad (in splitees)
+ * | | |
+ *
+ * This adjuster merges those MapReduceOpers by replacing POLoad/POStore
+ * combination with POSplit operator.
+ */
+ private class MultiQueryAdjuster extends MROpPlanVisitor {
+
+ MultiQueryAdjuster(MROperPlan plan) {
+ super(plan, new DependencyOrderWalker<MapReduceOper,
MROperPlan>(plan, false));
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+
+ if (!isSplitter(mr)) {
+ return;
+ }
+
+ // find all the single load map-only MROpers in the splitees
+ List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
+ List<MapReduceOper> multiLoadMappers = new
ArrayList<MapReduceOper>();
+ List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
+
+ List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+ for (MapReduceOper successor : successors) {
+ if (isMapOnly(successor)) {
+ if (isSingleLoadMapperPlan(successor.mapPlan)) {
+ mappers.add(successor);
+ } else {
+ multiLoadMappers.add(successor);
+ }
+ } else {
+ mapReducers.add(successor);
+ }
+ }
+
+ PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan :
mr.reducePlan;
+ PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+ List<PhysicalOperator> storePreds =
splitterPl.getPredecessors(storeOp);
+
+ if (mappers.size() == 1 && mapReducers.size() == 0 &&
multiLoadMappers.size() == 0) {
+
+ // In this case, just add splitee's map plan to the splitter's
plan
+ MapReduceOper mapper = mappers.get(0);
+
+ PhysicalPlan pl = mapper.mapPlan;
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+
+ // make a copy before removing the store operator
+ List<PhysicalOperator> predsCopy = new
ArrayList<PhysicalOperator>(storePreds);
+ splitterPl.remove(storeOp);
+
+ // connect two plans
+ List<PhysicalOperator> roots = pl.getRoots();
+ try {
+ splitterPl.merge(pl);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ for (PhysicalOperator pred : predsCopy) {
+ for (PhysicalOperator root : roots) {
+ try {
+ splitterPl.connect(pred, root);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+
+ } else if (mappers.size() > 0) {
+
+ // merge splitee's map plans into nested plan of
+ // the splitter operator
+ POSplit splitOp = getSplit();
+ for (MapReduceOper mapper : mappers) {
+
+ PhysicalPlan pl = mapper.mapPlan;
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+ try {
+ splitOp.addPlan(pl);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // add original store to the split operator
+ // if there is at least one MapReduce splitee
+ if (mapReducers.size() + multiLoadMappers.size() > 0) {
+ PhysicalPlan storePlan = new PhysicalPlan();
+ try {
+ storePlan.addAsLeaf(storeOp);
+ splitOp.addPlan(storePlan);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // replace store operator in the splitter with split operator
+ splitOp.setInputs(storePreds);
+ try {
+ splitterPl.replace(storeOp, splitOp);;
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // remove all the map-only splitees from the MROperPlan
+ for (MapReduceOper mapper : mappers) {
+ removeAndReconnect(mapper, mr);
+ }
+
+ // TO-DO: merge the other splitees if possible
+ if (mapReducers.size() + multiLoadMappers.size() > 0 ) {
+ // XXX
+ }
+ }
+
+ /**
+ * Removes the specified MR operator from the plan after the merge.
+ * Connects its predecessors and successors to the merged MR operator
+ * @param mr the MR operator to remove
+ * @param newMR the MR operator to be connected to the predecessors
and
+ * the successors of the removed operator
+ * @throws VisitorException if connect operation fails
+ */
+ private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR)
throws VisitorException {
+ List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
+ List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
+
+ // make a copy before removing operator
+ ArrayList<MapReduceOper> succsCopy = null;
+ ArrayList<MapReduceOper> predsCopy = null;
+ if (mapperSuccs != null) {
+ succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
+ }
+ if (mapperPreds != null) {
+ predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
+ }
+ getPlan().remove(mr);
+
+ // reconnect the mapper's successors
+ if (succsCopy != null) {
+ for (MapReduceOper succ : succsCopy) {
+ try {
+ getPlan().connect(newMR, succ);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+
+ // reconnect the mapper's predecessors
+ if (predsCopy != null) {
+ for (MapReduceOper pred : predsCopy) {
+ if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
+ continue;
+ }
+ try {
+ getPlan().connect(pred, newMR);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Checks whether the specified MapReduce operator is a splitter
+ */
+ private boolean isSplitter(MapReduceOper mr) {
+
+ List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+ if (successors == null || successors.size() == 0) {
+ return false;
+ }
+
+ PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
+
+ List<PhysicalOperator> mapLeaves = pl.getLeaves();
+ if (mapLeaves == null || mapLeaves.size() != 1) {
+ return false;
+ }
+
+ PhysicalOperator mapLeaf = mapLeaves.get(0);
+ if (!(mapLeaf instanceof POStore)) {
+ return false;
+ }
+
+ POStore store = (POStore)mapLeaf;
+ String fileName = store.getSFile().getFileName();
+
+ for (MapReduceOper mro : successors) {
+ List<PhysicalOperator> roots = mro.mapPlan.getRoots();
+ boolean splitee = false;
+ for (PhysicalOperator root : roots) {
+ if (root instanceof POLoad) {
+ POLoad load = (POLoad)root;
+ if (fileName.compareTo(load.getLFile().getFileName())
== 0) {
+ splitee = true;
+ break;
+ }
+ }
+ }
+ if (!splitee) return false;
+ }
+
+ return true;
+ }
+
+ private boolean isMapOnly(MapReduceOper mr) {
+ return mr.reducePlan.isEmpty();
+ }
+
+ private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
+ List<PhysicalOperator> roots = pl.getRoots();
+ if (roots.size() != 1) {
+ return false;
+ }
+
+ PhysicalOperator root = roots.get(0);
+ if (!(root instanceof POLoad)) {
+ throw new IllegalStateException("Invalid root operator in
mapper Splitee: "
+ + root.getClass().getName());
+ }
+ List<PhysicalOperator> successors = pl.getSuccessors(root);
+ if (successors == null || successors.size() != 1) {
+ throw new IllegalStateException("Root in mapper Splitee has no
successor: "
+ + successors.size());
+ }
+ PhysicalOperator op = successors.get(0);
+ if (!(op instanceof POFilter)) {
+ throw new IllegalStateException("Invalid successor of load in
mapper Splitee: "
+ + op.getClass().getName());
+ }
+ return true;
+ }
+ }
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
Wed Mar 11 23:03:46 2009
@@ -157,6 +157,9 @@
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
}
+ else if (node instanceof POSplit) {
+ sb.append(planString(((POSplit)node).getPlans()));
+ }
else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
Wed Mar 11 23:03:46 2009
@@ -17,12 +17,21 @@
*/
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -45,11 +54,11 @@
* ---- POSplit -... ----
* This is different than the existing implementation
* where the POSplit writes to sidefiles after filtering
- * and then loads the appropirate file.
+ * and then loads the appropriate file.
*
* The approach followed here is as good as the old
* approach if not better in many cases because
- * of the availablity of attachinInputs. An optimization
+ * of the availability of attachinInputs. An optimization
* that can ensue is if there are multiple loads that
* load the same file, they can be merged into one and
* then the operators that take input from the load
@@ -65,27 +74,61 @@
* job whenever necessary.
*/
public class POSplit extends PhysicalOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private Log log = LogFactory.getLog(getClass());
+
+ /*
+ * The filespec that is used to store and load the output of the split job
+ * which is the job containing the split
+ */
+ private FileSpec splitStore;
+
+ /*
+ * The inner physical plan
+ */
+ private PhysicalPlan myPlan = new PhysicalPlan();
+
+ /*
+ * The list of sub-plans the inner plan is composed of
+ */
+ private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
+
/**
- *
+ * Constructs an operator with the specified key
+ * @param k the operator key
*/
- private static final long serialVersionUID = 1L;
- //The filespec that is used to store
- //and load the output of the split job
- //which is the job containing the split
- FileSpec splitStore;
-
public POSplit(OperatorKey k) {
this(k,-1,null);
}
+ /**
+ * Constructs an operator with the specified key
+ * and degree of parallelism
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ */
public POSplit(OperatorKey k, int rp) {
this(k,rp,null);
}
+ /**
+ * Constructs an operator with the specified key and inputs
+ * @param k the operator key
+ * @param inp the inputs that this operator will read data from
+ */
public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
- this(k,-1,null);
+ this(k,-1,inp);
}
+ /**
+ * Constructs an operator with the specified key,
+ * degree of parallelism and inputs
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ * @param inp the inputs that this operator will read data from
+ */
public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
}
@@ -110,12 +153,107 @@
return true;
}
+ /**
+ * Returns the name of the file associated with this operator
+ * @return the FileSpec associated with this operator
+ */
public FileSpec getSplitStore() {
return splitStore;
}
+ /**
+ * Sets the name of the file associated with this operator
+ * @param splitStore the FileSpec used to store the data
+ */
public void setSplitStore(FileSpec splitStore) {
this.splitStore = splitStore;
}
+ /**
+ * Returns the inner physical plan associated with this operator
+ * @return the inner plan
+ */
+ public PhysicalPlan getPlan() {
+ return myPlan;
+ }
+
+ /**
+ * Returns the list of nested plans. This is used by
+ * explain method to display the inner plans.
+ * @return the list of the nested plans
+ * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
+ */
+ public List<PhysicalPlan> getPlans() {
+ return myPlans;
+ }
+
+ /**
+ * Appends the specified plan to the end of
+ * the nested input plan list
+ * @param plan plan to be appended to the list
+ */
+ public void addPlan(PhysicalPlan inPlan) throws PlanException {
+ myPlan.merge(inPlan);
+ myPlans.add(inPlan);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ Result inp = processInput();
+
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ return inp;
+ }
+
+ // process the nested plans
+ myPlan.attachInput((Tuple)inp.result);
+ processPlan();
+
+ return new Result(POStatus.STATUS_NULL, null);
+ }
+
+ private void processPlan() throws ExecException {
+
+ List<PhysicalOperator> leaves = myPlan.getLeaves();
+
+ for (PhysicalOperator leaf : leaves) {
+
+ // TO-DO: other types of leaves are possible later
+ if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) {
+ throw new ExecException("Invalid operator type in the split
plan: "
+ + leaf.getOperatorKey());
+ }
+
+ runPipeline(leaf);
+ }
+ }
+
+ private void runPipeline(PhysicalOperator leaf) throws ExecException {
+
+ while (true) {
+
+ Result res = leaf.getNext(dummyTuple);
+
+ if (res.returnStatus == POStatus.STATUS_OK ||
+ res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ break;
+ }
+
+ if (res.returnStatus == POStatus.STATUS_ERR) {
+
+ // if there is an err message use it
+ String errMsg = (res.result != null) ?
+ "Received Error while processing the split plan: " +
res.result :
+ "Received Error while processing the split plan.";
+
+ throw new ExecException(errMsg);
+ }
+ }
+ }
+
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
Wed Mar 11 23:03:46 2009
@@ -52,7 +52,12 @@
if (leaf instanceof POStore) {
stores.add((POStore)leaf);
}
- }
+ if (leaf instanceof POSplit) {
+ PhysicalPlan pl = ((POSplit)leaf).getPlan();
+ List<POStore> nestedStores = getStores(pl);
+ stores.addAll(nestedStores);
+ }
+ }
return stores;
}
@@ -88,4 +93,4 @@
return new Path("rel/"+pathStr).toString();
}
}
-}
\ No newline at end of file
+}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
Wed Mar 11 23:03:46 2009
@@ -212,7 +212,18 @@
*/
public Schema determineSchema(String fileName, ExecType execType,
DataStorage storage) throws IOException {
- InputStream is = FileLocalizer.open(fileName, execType, storage);
+
+ InputStream is = null;
+
+ try {
+ is = FileLocalizer.open(fileName, execType, storage);
+ } catch (IOException e) {
+ // At compile time in batch mode, the file may not exist
+ // (such as intermediate file). Just return null - the
+ // same way as we could's get a valid record from the input.
+ return null;
+ }
+
bindTo(fileName, new BufferedPositionedInputStream(is), 0,
Long.MAX_VALUE);
// get the first record from the input file
// and figure out the schema from the data in
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=752683&r1=752682&r2=752683&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
Wed Mar 11 23:03:46 2009
@@ -28,21 +28,34 @@
* DependencyOrderWalker traverses the graph in such a way that no node is
visited
* before all the nodes it depends on have been visited. Beyond this, it does
not
* guarantee any particular order. So, you have a graph with node 1 2 3 4, and
- * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be
visited
+ * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be
visited
* before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
* visited first.
*/
public class DependencyOrderWalker <O extends Operator, P extends
OperatorPlan<O>>
extends PlanWalker<O, P> {
+ private boolean rootsFirst = true;
+
/**
* @param plan Plan for this walker to traverse.
*/
public DependencyOrderWalker(P plan) {
- super(plan);
+ this(plan, true);
}
/**
+ * Constructs a walker with the specified plan and walk direction
+ * @param plan plan for this walker to traverse
+ * @param rootsFirst flag that indicates walking up (from roots
+ * to leaves) or walking down (from leaves to roots)
+ */
+ public DependencyOrderWalker(P plan, boolean rootsFirst) {
+ super(plan);
+ this.rootsFirst = rootsFirst;
+ }
+
+ /**
* Begin traversing the graph.
* @param visitor Visitor this walker is being used by.
* @throws VisitorException if an error is encountered while walking.
@@ -58,12 +71,14 @@
List<O> fifo = new ArrayList<O>();
Set<O> seen = new HashSet<O>();
- List<O> leaves = mPlan.getLeaves();
- if (leaves == null) return;
- for (O op : leaves) {
- doAllPredecessors(op, seen, fifo);
+ List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots();
+
+ if (nodes == null) return;
+
+ for (O op : nodes) {
+ doAllDependencies(op, seen, fifo);
}
-
+
for (O op: fifo) {
op.visit(visitor);
}
@@ -73,9 +88,28 @@
return new DependencyOrderWalker<O, P>(plan);
}
+ protected void doAllDependencies(O node,
+ Set<O> seen,
+ Collection<O> fifo) throws
VisitorException {
+ if (!seen.contains(node)) {
+ // We haven't seen this one before.
+ Collection<O> nodes = rootsFirst ?
+ mPlan.getPredecessors(node) : mPlan.getSuccessors(node);
+ if (nodes != null) {
+ // Do all our predecessors before ourself
+ for (O op : nodes) {
+ doAllDependencies(op, seen, fifo);
+ }
+ }
+ // Now do ourself
+ seen.add(node);
+ fifo.add(node);
+ }
+ }
+
protected void doAllPredecessors(O node,
- Set<O> seen,
- Collection<O> fifo) throws VisitorException
{
+ Set<O> seen,
+ Collection<O> fifo) throws
VisitorException {
if (!seen.contains(node)) {
// We haven't seen this one before.
Collection<O> preds = mPlan.getPredecessors(node);
@@ -86,8 +120,9 @@
}
}
// Now do ourself
- seen.add(node);
+ seen.add(node);
fifo.add(node);
}
}
+
}