Author: pradeepkth Date: Wed Mar 11 23:03:46 2009 New Revision: 752683 URL: http://svn.apache.org/viewvc?rev=752683&view=rev Log: PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Modified: hadoop/pig/branches/multiquery/CHANGES.txt hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Modified: hadoop/pig/branches/multiquery/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/CHANGES.txt (original) +++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Mar 11 23:03:46 2009 @@ -410,3 +410,5 @@ PIG-627: multiquery support M1 (hagleitn via olgan) PIG-627: multiquery support M2 (hagleitn via pradeepkth) + + PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth) Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Wed Mar 11 23:03:46 2009 @@ -28,10 +28,12 @@ import java.util.Date; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.Stack; import org.apache.commons.logging.Log; @@ -49,6 +51,9 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LOCogroup; +import org.apache.pig.impl.logicalLayer.LOFRJoin; +import org.apache.pig.impl.logicalLayer.LOLoad; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; @@ -62,6 +67,7 @@ import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.streaming.StreamingCommand; +import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.PropertiesUtil; import org.apache.pig.impl.logicalLayer.LODefine; import org.apache.pig.impl.logicalLayer.LOStore; @@ -838,9 +844,11 @@ private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); - private ArrayList<String> scriptCache = new ArrayList<String>(); + private List<String> scriptCache = new ArrayList<String>(); - private Map<LogicalOperator, LogicalPlan> storeOpTable = new HashMap<LogicalOperator, LogicalPlan>(); + private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>(); + + private Set<LOLoad> loadOps = new HashSet<LOLoad>(); private String jobName; @@ -866,9 +874,7 @@ Map<String, LogicalOperator> getAliasOp() { return aliasOp; } - ArrayList<String> getScriptCache() { return scriptCache; } - - Map<LogicalOperator, LogicalPlan> getStoreOpTable() { return storeOpTable; } + List<String> getScriptCache() { return scriptCache; } boolean isBatchOn() { return batchMode; }; @@ -922,8 +928,15 @@ } } else { if (0 == ignoreNumStores) { - storeOpTable.put(op, tmpLp); + storeOpTable.put((LOStore)op, tmpLp); lp.mergeSharedPlan(tmpLp); + List<LogicalOperator> roots = tmpLp.getRoots(); + for (LogicalOperator root : roots) { + if (root instanceof LOLoad) { + loadOps.add((LOLoad)root); + } + } + } else { --ignoreNumStores; } @@ -987,10 +1000,51 @@ graph.lp = graph.parseQuery(it.next(), lineNumber); } } + graph.postProcess(); } catch (IOException ioe) { graph = null; - } + } return graph; } + + private void postProcess() throws IOException { + + // The following code deals with store/load combination of + // intermediate files. In this case we replace the load operator + // with a (implicit) split operator. + for (LOLoad load : loadOps) { + for (LOStore store : storeOpTable.keySet()) { + String ifile = load.getInputFile().getFileName(); + String ofile = store.getOutputFile().getFileName(); + if (ofile.compareTo(ifile) == 0) { + LogicalOperator storePred = lp.getPredecessors(store).get(0); + + lp.disconnect(store, load); + lp.replace(load, storePred); + + List<LogicalOperator> succs = lp.getSuccessors(storePred); + + for (LogicalOperator succ : succs) { + MultiMap<LogicalOperator, LogicalPlan> innerPls = null; + + // fix inner plans for cogroup and frjoin operators + if (succ instanceof LOCogroup) { + innerPls = ((LOCogroup)succ).getGroupByPlans(); + } else if (succ instanceof LOFRJoin) { + innerPls = ((LOFRJoin)succ).getJoinColPlans(); + } + + if (innerPls != null) { + if (innerPls.containsKey(load)) { + Collection<LogicalPlan> pls = innerPls.get(load); + innerPls.removeKey(load); + innerPls.put(storePred, pls); + } + } + } + } + } + } + } } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Mar 11 23:03:46 2009 @@ -64,6 +64,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -239,6 +240,9 @@ la.visit(); la.adjust(); + MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan); + ma.visit(); + return MRPlan; } @@ -305,6 +309,11 @@ return st; } + private POSplit getSplit(){ + POSplit sp = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope))); + return sp; + } + /** * A map MROper is an MROper whose map plan is still open * for taking more non-blocking operators. @@ -491,7 +500,7 @@ MRPlan.connect(old, ret); return ret; } - + /** * Returns a temporary DFS Path * @return @@ -1655,5 +1664,258 @@ keyType = p.getResultType(); } } - + + /** + * An adjuster that merges all or part splitee MapReduceOpers into + * splitter MapReduceOper. The merge can produce a MROperPlan that has + * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan. + * + * The MRCompler generates multiple MapReduceOpers whenever it encounters + * a split operator and connects the single splitter MapReduceOper to + * one or more splitee MapReduceOpers using store/load operators: + * + * ---- POStore (in splitter) -... ---- + * | | ... | + * | | ... | + * POLoad POLoad ... POLoad (in splitees) + * | | | + * + * This adjuster merges those MapReduceOpers by replacing POLoad/POStore + * combination with POSplit operator. + */ + private class MultiQueryAdjuster extends MROpPlanVisitor { + + MultiQueryAdjuster(MROperPlan plan) { + super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, false)); + } + + @Override + public void visitMROp(MapReduceOper mr) throws VisitorException { + + if (!isSplitter(mr)) { + return; + } + + // find all the single load map-only MROpers in the splitees + List<MapReduceOper> mappers = new ArrayList<MapReduceOper>(); + List<MapReduceOper> multiLoadMappers = new ArrayList<MapReduceOper>(); + List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>(); + + List<MapReduceOper> successors = getPlan().getSuccessors(mr); + for (MapReduceOper successor : successors) { + if (isMapOnly(successor)) { + if (isSingleLoadMapperPlan(successor.mapPlan)) { + mappers.add(successor); + } else { + multiLoadMappers.add(successor); + } + } else { + mapReducers.add(successor); + } + } + + PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan; + PhysicalOperator storeOp = splitterPl.getLeaves().get(0); + List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp); + + if (mappers.size() == 1 && mapReducers.size() == 0 && multiLoadMappers.size() == 0) { + + // In this case, just add splitee's map plan to the splitter's plan + MapReduceOper mapper = mappers.get(0); + + PhysicalPlan pl = mapper.mapPlan; + PhysicalOperator load = pl.getRoots().get(0); + pl.remove(load); + + // make a copy before removing the store operator + List<PhysicalOperator> predsCopy = new ArrayList<PhysicalOperator>(storePreds); + splitterPl.remove(storeOp); + + // connect two plans + List<PhysicalOperator> roots = pl.getRoots(); + try { + splitterPl.merge(pl); + } catch (PlanException e) { + throw new VisitorException(e); + } + + for (PhysicalOperator pred : predsCopy) { + for (PhysicalOperator root : roots) { + try { + splitterPl.connect(pred, root); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + + } else if (mappers.size() > 0) { + + // merge splitee's map plans into nested plan of + // the splitter operator + POSplit splitOp = getSplit(); + for (MapReduceOper mapper : mappers) { + + PhysicalPlan pl = mapper.mapPlan; + PhysicalOperator load = pl.getRoots().get(0); + pl.remove(load); + try { + splitOp.addPlan(pl); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + // add original store to the split operator + // if there is at least one MapReduce splitee + if (mapReducers.size() + multiLoadMappers.size() > 0) { + PhysicalPlan storePlan = new PhysicalPlan(); + try { + storePlan.addAsLeaf(storeOp); + splitOp.addPlan(storePlan); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + // replace store operator in the splitter with split operator + splitOp.setInputs(storePreds); + try { + splitterPl.replace(storeOp, splitOp);; + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + // remove all the map-only splitees from the MROperPlan + for (MapReduceOper mapper : mappers) { + removeAndReconnect(mapper, mr); + } + + // TO-DO: merge the other splitees if possible + if (mapReducers.size() + multiLoadMappers.size() > 0 ) { + // XXX + } + } + + /** + * Removes the specified MR operator from the plan after the merge. + * Connects its predecessors and successors to the merged MR operator + * @param mr the MR operator to remove + * @param newMR the MR operator to be connected to the predecessors and + * the successors of the removed operator + * @throws VisitorException if connect operation fails + */ + private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) throws VisitorException { + List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr); + List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr); + + // make a copy before removing operator + ArrayList<MapReduceOper> succsCopy = null; + ArrayList<MapReduceOper> predsCopy = null; + if (mapperSuccs != null) { + succsCopy = new ArrayList<MapReduceOper>(mapperSuccs); + } + if (mapperPreds != null) { + predsCopy = new ArrayList<MapReduceOper>(mapperPreds); + } + getPlan().remove(mr); + + // reconnect the mapper's successors + if (succsCopy != null) { + for (MapReduceOper succ : succsCopy) { + try { + getPlan().connect(newMR, succ); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + + // reconnect the mapper's predecessors + if (predsCopy != null) { + for (MapReduceOper pred : predsCopy) { + if (newMR.getOperatorKey().equals(pred.getOperatorKey())) { + continue; + } + try { + getPlan().connect(pred, newMR); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + } + + /* + * Checks whether the specified MapReduce operator is a splitter + */ + private boolean isSplitter(MapReduceOper mr) { + + List<MapReduceOper> successors = getPlan().getSuccessors(mr); + if (successors == null || successors.size() == 0) { + return false; + } + + PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan; + + List<PhysicalOperator> mapLeaves = pl.getLeaves(); + if (mapLeaves == null || mapLeaves.size() != 1) { + return false; + } + + PhysicalOperator mapLeaf = mapLeaves.get(0); + if (!(mapLeaf instanceof POStore)) { + return false; + } + + POStore store = (POStore)mapLeaf; + String fileName = store.getSFile().getFileName(); + + for (MapReduceOper mro : successors) { + List<PhysicalOperator> roots = mro.mapPlan.getRoots(); + boolean splitee = false; + for (PhysicalOperator root : roots) { + if (root instanceof POLoad) { + POLoad load = (POLoad)root; + if (fileName.compareTo(load.getLFile().getFileName()) == 0) { + splitee = true; + break; + } + } + } + if (!splitee) return false; + } + + return true; + } + + private boolean isMapOnly(MapReduceOper mr) { + return mr.reducePlan.isEmpty(); + } + + private boolean isSingleLoadMapperPlan(PhysicalPlan pl) { + List<PhysicalOperator> roots = pl.getRoots(); + if (roots.size() != 1) { + return false; + } + + PhysicalOperator root = roots.get(0); + if (!(root instanceof POLoad)) { + throw new IllegalStateException("Invalid root operator in mapper Splitee: " + + root.getClass().getName()); + } + List<PhysicalOperator> successors = pl.getSuccessors(root); + if (successors == null || successors.size() != 1) { + throw new IllegalStateException("Root in mapper Splitee has no successor: " + + successors.size()); + } + PhysicalOperator op = successors.get(0); + if (!(op instanceof POFilter)) { + throw new IllegalStateException("Invalid successor of load in mapper Splitee: " + + op.getClass().getName()); + } + return true; + } + } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Wed Mar 11 23:03:46 2009 @@ -157,6 +157,9 @@ else if(node instanceof POForEach){ sb.append(planString(((POForEach)node).getInputPlans())); } + else if (node instanceof POSplit) { + sb.append(planString(((POSplit)node).getPlans())); + } else if(node instanceof POFRJoin){ POFRJoin frj = (POFRJoin)node; List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans(); Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Wed Mar 11 23:03:46 2009 @@ -17,12 +17,21 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; +import java.util.ArrayList; import java.util.List; -import org.apache.pig.impl.io.FileSpec; -import org.apache.pig.impl.plan.OperatorKey; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; /** @@ -45,11 +54,11 @@ * ---- POSplit -... ---- * This is different than the existing implementation * where the POSplit writes to sidefiles after filtering - * and then loads the appropirate file. + * and then loads the appropriate file. * * The approach followed here is as good as the old * approach if not better in many cases because - * of the availablity of attachinInputs. An optimization + * of the availability of attachinInputs. An optimization * that can ensue is if there are multiple loads that * load the same file, they can be merged into one and * then the operators that take input from the load @@ -65,27 +74,61 @@ * job whenever necessary. */ public class POSplit extends PhysicalOperator { + + private static final long serialVersionUID = 1L; + + private Log log = LogFactory.getLog(getClass()); + + /* + * The filespec that is used to store and load the output of the split job + * which is the job containing the split + */ + private FileSpec splitStore; + + /* + * The inner physical plan + */ + private PhysicalPlan myPlan = new PhysicalPlan(); + + /* + * The list of sub-plans the inner plan is composed of + */ + private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>(); + /** - * + * Constructs an operator with the specified key + * @param k the operator key */ - private static final long serialVersionUID = 1L; - //The filespec that is used to store - //and load the output of the split job - //which is the job containing the split - FileSpec splitStore; - public POSplit(OperatorKey k) { this(k,-1,null); } + /** + * Constructs an operator with the specified key + * and degree of parallelism + * @param k the operator key + * @param rp the degree of parallelism requested + */ public POSplit(OperatorKey k, int rp) { this(k,rp,null); } + /** + * Constructs an operator with the specified key and inputs + * @param k the operator key + * @param inp the inputs that this operator will read data from + */ public POSplit(OperatorKey k, List<PhysicalOperator> inp) { - this(k,-1,null); + this(k,-1,inp); } + /** + * Constructs an operator with the specified key, + * degree of parallelism and inputs + * @param k the operator key + * @param rp the degree of parallelism requested + * @param inp the inputs that this operator will read data from + */ public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) { super(k, rp, inp); } @@ -110,12 +153,107 @@ return true; } + /** + * Returns the name of the file associated with this operator + * @return the FileSpec associated with this operator + */ public FileSpec getSplitStore() { return splitStore; } + /** + * Sets the name of the file associated with this operator + * @param splitStore the FileSpec used to store the data + */ public void setSplitStore(FileSpec splitStore) { this.splitStore = splitStore; } + /** + * Returns the inner physical plan associated with this operator + * @return the inner plan + */ + public PhysicalPlan getPlan() { + return myPlan; + } + + /** + * Returns the list of nested plans. This is used by + * explain method to display the inner plans. + * @return the list of the nested plans + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter + */ + public List<PhysicalPlan> getPlans() { + return myPlans; + } + + /** + * Appends the specified plan to the end of + * the nested input plan list + * @param plan plan to be appended to the list + */ + public void addPlan(PhysicalPlan inPlan) throws PlanException { + myPlan.merge(inPlan); + myPlans.add(inPlan); + } + + @Override + public Result getNext(Tuple t) throws ExecException { + + Result inp = processInput(); + + if (inp.returnStatus == POStatus.STATUS_EOP) { + return inp; + } + + // process the nested plans + myPlan.attachInput((Tuple)inp.result); + processPlan(); + + return new Result(POStatus.STATUS_NULL, null); + } + + private void processPlan() throws ExecException { + + List<PhysicalOperator> leaves = myPlan.getLeaves(); + + for (PhysicalOperator leaf : leaves) { + + // TO-DO: other types of leaves are possible later + if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) { + throw new ExecException("Invalid operator type in the split plan: " + + leaf.getOperatorKey()); + } + + runPipeline(leaf); + } + } + + private void runPipeline(PhysicalOperator leaf) throws ExecException { + + while (true) { + + Result res = leaf.getNext(dummyTuple); + + if (res.returnStatus == POStatus.STATUS_OK || + res.returnStatus == POStatus.STATUS_NULL) { + continue; + } + + if (res.returnStatus == POStatus.STATUS_EOP) { + break; + } + + if (res.returnStatus == POStatus.STATUS_ERR) { + + // if there is an err message use it + String errMsg = (res.result != null) ? + "Received Error while processing the split plan: " + res.result : + "Received Error while processing the split plan."; + + throw new ExecException(errMsg); + } + } + } + } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Wed Mar 11 23:03:46 2009 @@ -52,7 +52,12 @@ if (leaf instanceof POStore) { stores.add((POStore)leaf); } - } + if (leaf instanceof POSplit) { + PhysicalPlan pl = ((POSplit)leaf).getPlan(); + List<POStore> nestedStores = getStores(pl); + stores.addAll(nestedStores); + } + } return stores; } @@ -88,4 +93,4 @@ return new Path("rel/"+pathStr).toString(); } } -} \ No newline at end of file +} Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java Wed Mar 11 23:03:46 2009 @@ -212,7 +212,18 @@ */ public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException { - InputStream is = FileLocalizer.open(fileName, execType, storage); + + InputStream is = null; + + try { + is = FileLocalizer.open(fileName, execType, storage); + } catch (IOException e) { + // At compile time in batch mode, the file may not exist + // (such as intermediate file). Just return null - the + // same way as we could's get a valid record from the input. + return null; + } + bindTo(fileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); // get the first record from the input file // and figure out the schema from the data in Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=752683&r1=752682&r2=752683&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Wed Mar 11 23:03:46 2009 @@ -28,21 +28,34 @@ * DependencyOrderWalker traverses the graph in such a way that no node is visited * before all the nodes it depends on have been visited. Beyond this, it does not * guarantee any particular order. So, you have a graph with node 1 2 3 4, and - * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be visited + * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be visited * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be * visited first. */ public class DependencyOrderWalker <O extends Operator, P extends OperatorPlan<O>> extends PlanWalker<O, P> { + private boolean rootsFirst = true; + /** * @param plan Plan for this walker to traverse. */ public DependencyOrderWalker(P plan) { - super(plan); + this(plan, true); } /** + * Constructs a walker with the specified plan and walk direction + * @param plan plan for this walker to traverse + * @param rootsFirst flag that indicates walking up (from roots + * to leaves) or walking down (from leaves to roots) + */ + public DependencyOrderWalker(P plan, boolean rootsFirst) { + super(plan); + this.rootsFirst = rootsFirst; + } + + /** * Begin traversing the graph. * @param visitor Visitor this walker is being used by. * @throws VisitorException if an error is encountered while walking. @@ -58,12 +71,14 @@ List<O> fifo = new ArrayList<O>(); Set<O> seen = new HashSet<O>(); - List<O> leaves = mPlan.getLeaves(); - if (leaves == null) return; - for (O op : leaves) { - doAllPredecessors(op, seen, fifo); + List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots(); + + if (nodes == null) return; + + for (O op : nodes) { + doAllDependencies(op, seen, fifo); } - + for (O op: fifo) { op.visit(visitor); } @@ -73,9 +88,28 @@ return new DependencyOrderWalker<O, P>(plan); } + protected void doAllDependencies(O node, + Set<O> seen, + Collection<O> fifo) throws VisitorException { + if (!seen.contains(node)) { + // We haven't seen this one before. + Collection<O> nodes = rootsFirst ? + mPlan.getPredecessors(node) : mPlan.getSuccessors(node); + if (nodes != null) { + // Do all our predecessors before ourself + for (O op : nodes) { + doAllDependencies(op, seen, fifo); + } + } + // Now do ourself + seen.add(node); + fifo.add(node); + } + } + protected void doAllPredecessors(O node, - Set<O> seen, - Collection<O> fifo) throws VisitorException { + Set<O> seen, + Collection<O> fifo) throws VisitorException { if (!seen.contains(node)) { // We haven't seen this one before. Collection<O> preds = mPlan.getPredecessors(node); @@ -86,8 +120,9 @@ } } // Now do ourself - seen.add(node); + seen.add(node); fifo.add(node); } } + }