Author: pradeepkth Date: Mon Mar 23 23:36:10 2009 New Revision: 757598 URL: http://svn.apache.org/viewvc?rev=757598&view=rev Log: PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java Modified: hadoop/pig/branches/multiquery/CHANGES.txt hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/branches/multiquery/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/CHANGES.txt (original) +++ hadoop/pig/branches/multiquery/CHANGES.txt Mon Mar 23 23:36:10 2009 @@ -414,3 +414,5 @@ PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth) PIG-627: multiquery support incremental patch (hagleitn via pradeepkth) + + PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth) Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Mar 23 23:36:10 2009 @@ -64,7 +64,6 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; -import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -240,9 +239,6 @@ la.visit(); la.adjust(); - MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan); - ma.visit(); - return MRPlan; } @@ -309,11 +305,6 @@ return st; } - private POSplit getSplit(){ - POSplit sp = new POSplit(new OperatorKey(scope,nig.getNextNodeId(scope))); - return sp; - } - /** * A map MROper is an MROper whose map plan is still open * for taking more non-blocking operators. @@ -611,6 +602,7 @@ try{ FileSpec fSpec = op.getSplitStore(); MapReduceOper mro = endSingleInputPlanWithStr(fSpec); + mro.setSplitter(true); splitsSeen.put(op.getOperatorKey(), mro); curMROp = startNew(fSpec, mro); }catch(Exception e){ @@ -1665,257 +1657,4 @@ } } - /** - * An adjuster that merges all or part splitee MapReduceOpers into - * splitter MapReduceOper. The merge can produce a MROperPlan that has - * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan. - * - * The MRCompler generates multiple MapReduceOpers whenever it encounters - * a split operator and connects the single splitter MapReduceOper to - * one or more splitee MapReduceOpers using store/load operators: - * - * ---- POStore (in splitter) -... ---- - * | | ... | - * | | ... | - * POLoad POLoad ... POLoad (in splitees) - * | | | - * - * This adjuster merges those MapReduceOpers by replacing POLoad/POStore - * combination with POSplit operator. - */ - private class MultiQueryAdjuster extends MROpPlanVisitor { - - MultiQueryAdjuster(MROperPlan plan) { - super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, false)); - } - - @Override - public void visitMROp(MapReduceOper mr) throws VisitorException { - - if (!isSplitter(mr)) { - return; - } - - // find all the single load map-only MROpers in the splitees - List<MapReduceOper> mappers = new ArrayList<MapReduceOper>(); - List<MapReduceOper> multiLoadMappers = new ArrayList<MapReduceOper>(); - List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>(); - - List<MapReduceOper> successors = getPlan().getSuccessors(mr); - for (MapReduceOper successor : successors) { - if (isMapOnly(successor)) { - if (isSingleLoadMapperPlan(successor.mapPlan)) { - mappers.add(successor); - } else { - multiLoadMappers.add(successor); - } - } else { - mapReducers.add(successor); - } - } - - PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan; - PhysicalOperator storeOp = splitterPl.getLeaves().get(0); - List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp); - - if (mappers.size() == 1 && mapReducers.size() == 0 && multiLoadMappers.size() == 0) { - - // In this case, just add splitee's map plan to the splitter's plan - MapReduceOper mapper = mappers.get(0); - - PhysicalPlan pl = mapper.mapPlan; - PhysicalOperator load = pl.getRoots().get(0); - pl.remove(load); - - // make a copy before removing the store operator - List<PhysicalOperator> predsCopy = new ArrayList<PhysicalOperator>(storePreds); - splitterPl.remove(storeOp); - - // connect two plans - List<PhysicalOperator> roots = pl.getRoots(); - try { - splitterPl.merge(pl); - } catch (PlanException e) { - throw new VisitorException(e); - } - - for (PhysicalOperator pred : predsCopy) { - for (PhysicalOperator root : roots) { - try { - splitterPl.connect(pred, root); - } catch (PlanException e) { - throw new VisitorException(e); - } - } - } - - } else if (mappers.size() > 0) { - - // merge splitee's map plans into nested plan of - // the splitter operator - POSplit splitOp = getSplit(); - for (MapReduceOper mapper : mappers) { - - PhysicalPlan pl = mapper.mapPlan; - PhysicalOperator load = pl.getRoots().get(0); - pl.remove(load); - try { - splitOp.addPlan(pl); - } catch (PlanException e) { - throw new VisitorException(e); - } - } - - // add original store to the split operator - // if there is at least one MapReduce splitee - if (mapReducers.size() + multiLoadMappers.size() > 0) { - PhysicalPlan storePlan = new PhysicalPlan(); - try { - storePlan.addAsLeaf(storeOp); - splitOp.addPlan(storePlan); - } catch (PlanException e) { - throw new VisitorException(e); - } - } - - // replace store operator in the splitter with split operator - splitOp.setInputs(storePreds); - try { - splitterPl.replace(storeOp, splitOp);; - } catch (PlanException e) { - throw new VisitorException(e); - } - } - - // remove all the map-only splitees from the MROperPlan - for (MapReduceOper mapper : mappers) { - removeAndReconnect(mapper, mr); - } - - // TO-DO: merge the other splitees if possible - if (mapReducers.size() + multiLoadMappers.size() > 0 ) { - // XXX - } - } - - /** - * Removes the specified MR operator from the plan after the merge. - * Connects its predecessors and successors to the merged MR operator - * @param mr the MR operator to remove - * @param newMR the MR operator to be connected to the predecessors and - * the successors of the removed operator - * @throws VisitorException if connect operation fails - */ - private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) throws VisitorException { - List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr); - List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr); - - // make a copy before removing operator - ArrayList<MapReduceOper> succsCopy = null; - ArrayList<MapReduceOper> predsCopy = null; - if (mapperSuccs != null) { - succsCopy = new ArrayList<MapReduceOper>(mapperSuccs); - } - if (mapperPreds != null) { - predsCopy = new ArrayList<MapReduceOper>(mapperPreds); - } - getPlan().remove(mr); - - // reconnect the mapper's successors - if (succsCopy != null) { - for (MapReduceOper succ : succsCopy) { - try { - getPlan().connect(newMR, succ); - } catch (PlanException e) { - throw new VisitorException(e); - } - } - } - - // reconnect the mapper's predecessors - if (predsCopy != null) { - for (MapReduceOper pred : predsCopy) { - if (newMR.getOperatorKey().equals(pred.getOperatorKey())) { - continue; - } - try { - getPlan().connect(pred, newMR); - } catch (PlanException e) { - throw new VisitorException(e); - } - } - } - } - - /* - * Checks whether the specified MapReduce operator is a splitter - */ - private boolean isSplitter(MapReduceOper mr) { - - List<MapReduceOper> successors = getPlan().getSuccessors(mr); - if (successors == null || successors.size() == 0) { - return false; - } - - PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan; - - List<PhysicalOperator> mapLeaves = pl.getLeaves(); - if (mapLeaves == null || mapLeaves.size() != 1) { - return false; - } - - PhysicalOperator mapLeaf = mapLeaves.get(0); - if (!(mapLeaf instanceof POStore)) { - return false; - } - - POStore store = (POStore)mapLeaf; - String fileName = store.getSFile().getFileName(); - - for (MapReduceOper mro : successors) { - List<PhysicalOperator> roots = mro.mapPlan.getRoots(); - boolean splitee = false; - for (PhysicalOperator root : roots) { - if (root instanceof POLoad) { - POLoad load = (POLoad)root; - if (fileName.compareTo(load.getLFile().getFileName()) == 0) { - splitee = true; - break; - } - } - } - if (!splitee) return false; - } - - return true; - } - - private boolean isMapOnly(MapReduceOper mr) { - return mr.reducePlan.isEmpty(); - } - - private boolean isSingleLoadMapperPlan(PhysicalPlan pl) { - List<PhysicalOperator> roots = pl.getRoots(); - if (roots.size() != 1) { - return false; - } - - PhysicalOperator root = roots.get(0); - if (!(root instanceof POLoad)) { - throw new IllegalStateException("Invalid root operator in mapper Splitee: " - + root.getClass().getName()); - } - List<PhysicalOperator> successors = pl.getSuccessors(root); - if (successors == null || successors.size() != 1) { - throw new IllegalStateException("Root in mapper Splitee has no successor: " - + successors.size()); - } - PhysicalOperator op = successors.get(0); - if (!(op instanceof POFilter)) { - throw new IllegalStateException("Invalid successor of load in mapper Splitee: " - + op.getClass().getName()); - } - return true; - } - } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Mar 23 23:36:10 2009 @@ -183,6 +183,10 @@ // an appropriate NullableXXXWritable object KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan); kdv.visit(); + + MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan); + mqOptimizer.visit(); + return plan; } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Mar 23 23:36:10 2009 @@ -109,6 +109,10 @@ // to add additional map reduce operator with 1 reducer after this long limit = -1; + // Indicates that this MROper is a splitter MROper. + // That is, this MROper ends due to a POSPlit operator. + private boolean splitter = false; + public MapReduceOper(OperatorKey k) { super(k); mapPlan = new PhysicalPlan(); @@ -327,4 +331,12 @@ public int getRequestedParallelism() { return requestedParallelism; } + + public void setSplitter(boolean spl) { + splitter = spl; + } + + public boolean isSplitter() { + return splitter; + } } Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=757598&view=auto ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (added) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Mon Mar 23 23:36:10 2009 @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.ReverseDependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; + +/** + * An optimizer that merges all or part splittee MapReduceOpers into + * splitter MapReduceOper. The merge can produce a MROperPlan that has + * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan. + * + * The MRCompler generates multiple MapReduceOpers whenever it encounters + * a split operator and connects the single splitter MapReduceOper to + * one or more splittee MapReduceOpers using store/load operators: + * + * ---- POStore (in splitter) -... ---- + * | | ... | + * | | ... | + * POLoad POLoad ... POLoad (in splittees) + * | | | + * + * This optimizer merges those MapReduceOpers by replacing POLoad/POStore + * combination with POSplit operator. + */ +class MultiQueryOptimizer extends MROpPlanVisitor { + + private Log log = LogFactory.getLog(getClass()); + + private NodeIdGenerator nig; + + private String scope; + + MultiQueryOptimizer(MROperPlan plan) { + super(plan, new ReverseDependencyOrderWalker<MapReduceOper, MROperPlan>(plan)); + nig = NodeIdGenerator.getGenerator(); + List<MapReduceOper> roots = plan.getRoots(); + scope = roots.get(0).getOperatorKey().getScope(); + } + + @Override + public void visitMROp(MapReduceOper mr) throws VisitorException { + + if (!mr.isSplitter()) { + return; + } + + // first classify all the splittees + List<MapReduceOper> mappers = new ArrayList<MapReduceOper>(); + List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>(); + List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>(); + + List<MapReduceOper> successors = getPlan().getSuccessors(mr); + for (MapReduceOper successor : successors) { + if (isMapOnly(successor)) { + if (isSingleLoadMapperPlan(successor.mapPlan)) { + mappers.add(successor); + } else { + multiLoadMROpers.add(successor); + } + } else { + if (isSingleLoadMapperPlan(successor.mapPlan)) { + mapReducers.add(successor); + } else { + multiLoadMROpers.add(successor); + } + } + } + + // case 1: exactly one splittee and it's map-only + if (mappers.size() == 1 && mapReducers.size() == 0 + && multiLoadMROpers.size() == 0 ) { + mergeOnlyMapperSplittee(mappers.get(0), mr); + return; + } + + // case 2: exactly one splittee and it has reducer + if (isMapOnly(mr) && mapReducers.size() == 1 + && mappers.size() == 0 && multiLoadMROpers.size() == 0) { + mergeOnlyMapReduceSplittee(mapReducers.get(0), mr); + return; + } + + PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan; + POStore storeOp = (POStore)splitterPl.getLeaves().get(0); + + POSplit splitOp = null; + + // case 3: multiple splittees and at least one of them is map-only + if (mappers.size() > 0) { + splitOp = getSplit(); + mergeAllMapOnlySplittees(mappers, mr, splitOp); + } + + boolean splitterMapOnly = isMapOnly(mr); + + // case 4: multiple splittees and at least one of them has reducer + if (splitterMapOnly && mapReducers.size() > 0) { + + // pick one to merge, prefer one that has a combiner + MapReduceOper mapReducer= mapReducers.get(0); + for (MapReduceOper mro : mapReducers) { + if (!mro.combinePlan.isEmpty()) { + mapReducer = mro; + break; + } + } + + PhysicalOperator leaf = splitterPl.getLeaves().get(0); + + splitOp = (leaf instanceof POStore) ? + getSplit() : (POSplit)leaf; + + mergeSingleMapReduceSplittee(mapReducer, mr, splitOp); + } + + // finally, add original store to the split operator + // if there is splittee that hasn't been merged + if (splitOp != null + && ((multiLoadMROpers.size() > 0) + || (mapReducers.size() > 1) + || (!splitterMapOnly && mapReducers.size() > 0))) { + + PhysicalPlan storePlan = new PhysicalPlan(); + try { + storePlan.addAsLeaf(storeOp); + splitOp.addPlan(storePlan); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + + private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter) + throws VisitorException { + PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan : splitter.reducePlan; + POStore storeOp = (POStore)splitterPl.getLeaves().get(0); + List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp); + + PhysicalPlan pl = mapper.mapPlan; + PhysicalOperator load = pl.getRoots().get(0); + pl.remove(load); + + // make a copy before removing the store operator + List<PhysicalOperator> predsCopy = new ArrayList<PhysicalOperator>(storePreds); + splitterPl.remove(storeOp); + + try { + splitterPl.merge(pl); + } catch (PlanException e) { + throw new VisitorException(e); + } + + // connect two plans + List<PhysicalOperator> roots = pl.getRoots(); + for (PhysicalOperator pred : predsCopy) { + for (PhysicalOperator root : roots) { + try { + splitterPl.connect(pred, root); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + } + + private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper splitter) + throws VisitorException { + mergeOneMapPart(mapper, splitter); + removeAndReconnect(mapper, splitter); + } + + private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, MapReduceOper splitter) + throws VisitorException { + mergeOneMapPart(mapReducer, splitter); + + splitter.setMapDone(true); + splitter.reducePlan = mapReducer.reducePlan; + splitter.setReduceDone(true); + + removeAndReconnect(mapReducer, splitter); + } + + private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers, + MapReduceOper splitter, POSplit splitOp) throws VisitorException { + + PhysicalPlan splitterPl = isMapOnly(splitter) ? + splitter.mapPlan : splitter.reducePlan; + PhysicalOperator storeOp = splitterPl.getLeaves().get(0); + List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp); + + // merge splitee's map plans into nested plan of + // the splitter operator + for (MapReduceOper mapper : mappers) { + PhysicalPlan pl = mapper.mapPlan; + PhysicalOperator load = pl.getRoots().get(0); + pl.remove(load); + try { + splitOp.addPlan(pl); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + // replace store operator in the splitter with split operator + splitOp.setInputs(storePreds); + try { + splitterPl.replace(storeOp, splitOp);; + } catch (PlanException e) { + throw new VisitorException(e); + } + + // remove all the map-only splittees from the MROperPlan + for (MapReduceOper mapper : mappers) { + removeAndReconnect(mapper, splitter); + } + } + + private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, + MapReduceOper splitter, POSplit splitOp) throws VisitorException { + + PhysicalPlan splitterPl = splitter.mapPlan; + PhysicalOperator leaf = splitterPl.getLeaves().get(0); + PhysicalOperator storeOp = splitterPl.getLeaves().get(0); + List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp); + + PhysicalPlan pl = mapReduce.mapPlan; + PhysicalOperator load = pl.getRoots().get(0); + pl.remove(load); + try { + splitOp.addPlan(pl); + } catch (PlanException e) { + throw new VisitorException(e); + } + + splitter.setMapDone(true); + splitter.reducePlan = mapReduce.reducePlan; + splitter.setReduceDone(true); + splitter.combinePlan = mapReduce.combinePlan; + + // replace store operator in the splitter with split operator + if (leaf instanceof POStore) { + splitOp.setInputs(storePreds); + try { + splitterPl.replace(storeOp, splitOp);; + } catch (PlanException e) { + throw new VisitorException(e); + } + } + + removeAndReconnect(mapReduce, splitter); + } + + /** + * Removes the specified MR operator from the plan after the merge. + * Connects its predecessors and successors to the merged MR operator + * @param mr the MR operator to remove + * @param newMR the MR operator to be connected to the predecessors and + * the successors of the removed operator + * @throws VisitorException if connect operation fails + */ + private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR) throws VisitorException { + List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr); + List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr); + + // make a copy before removing operator + ArrayList<MapReduceOper> succsCopy = null; + ArrayList<MapReduceOper> predsCopy = null; + if (mapperSuccs != null) { + succsCopy = new ArrayList<MapReduceOper>(mapperSuccs); + } + if (mapperPreds != null) { + predsCopy = new ArrayList<MapReduceOper>(mapperPreds); + } + getPlan().remove(mr); + + // reconnect the mapper's successors + if (succsCopy != null) { + for (MapReduceOper succ : succsCopy) { + try { + getPlan().connect(newMR, succ); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + + // reconnect the mapper's predecessors + if (predsCopy != null) { + for (MapReduceOper pred : predsCopy) { + if (newMR.getOperatorKey().equals(pred.getOperatorKey())) { + continue; + } + try { + getPlan().connect(pred, newMR); + } catch (PlanException e) { + throw new VisitorException(e); + } + } + } + + mergeMROperProperties(mr, newMR); + } + + private void mergeMROperProperties(MapReduceOper from, MapReduceOper to) { + + if (from.isStreamInMap()) { + to.setStreamInMap(true); + } + + if (from.isStreamInReduce()) { + to.setStreamInReduce(true); + } + + if (from.getRequestedParallelism() > to.getRequestedParallelism()) { + to.requestedParallelism = from.requestedParallelism; + } + + if (!from.UDFs.isEmpty()) { + to.UDFs.addAll(from.UDFs); + } + + if (from.needsDistinctCombiner()) { + to.setNeedsDistinctCombiner(true); + } + + if (to.mapKeyType == DataType.UNKNOWN) { + to.mapKeyType = from.mapKeyType; + } + } + + private boolean isMapOnly(MapReduceOper mr) { + return mr.reducePlan.isEmpty(); + } + + private boolean isSingleLoadMapperPlan(PhysicalPlan pl) { + List<PhysicalOperator> roots = pl.getRoots(); + return (roots.size() == 1); + } + + private POSplit getSplit(){ + POSplit sp = new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope))); + return sp; + } +} Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Mon Mar 23 23:36:10 2009 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; import java.util.ArrayList; +import java.util.BitSet; import java.util.List; import org.apache.commons.logging.Log; @@ -84,17 +85,16 @@ * which is the job containing the split */ private FileSpec splitStore; - - /* - * The inner physical plan - */ - private PhysicalPlan myPlan = new PhysicalPlan(); - + /* * The list of sub-plans the inner plan is composed of */ private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>(); + private BitSet processedSet = new BitSet(); + + private static Result empty = new Result(POStatus.STATUS_NULL, null); + /** * Constructs an operator with the specified key * @param k the operator key @@ -170,16 +170,7 @@ } /** - * Returns the inner physical plan associated with this operator - * @return the inner plan - */ - public PhysicalPlan getPlan() { - return myPlan; - } - - /** - * Returns the list of nested plans. This is used by - * explain method to display the inner plans. + * Returns the list of nested plans. * @return the list of the nested plans * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter */ @@ -193,67 +184,69 @@ * @param plan plan to be appended to the list */ public void addPlan(PhysicalPlan inPlan) throws PlanException { - myPlan.merge(inPlan); myPlans.add(inPlan); + processedSet.set(myPlans.size()-1); } @Override public Result getNext(Tuple t) throws ExecException { - Result inp = processInput(); + if (processedSet.cardinality() == myPlans.size()) { - if (inp.returnStatus == POStatus.STATUS_EOP) { - return inp; - } + Result inp = processInput(); + + if (inp.returnStatus == POStatus.STATUS_EOP) { + return inp; + } - // process the nested plans - myPlan.attachInput((Tuple)inp.result); - processPlan(); + Tuple tuple = (Tuple)inp.result; + for (PhysicalPlan pl : myPlans) { + pl.attachInput(tuple); + } - return new Result(POStatus.STATUS_NULL, null); + processedSet.clear(); + } + + return processPlan(); } - private void processPlan() throws ExecException { + private Result processPlan() throws ExecException { - List<PhysicalOperator> leaves = myPlan.getLeaves(); + int idx = processedSet.nextClearBit(0); + PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0); - for (PhysicalOperator leaf : leaves) { - - // TO-DO: other types of leaves are possible later - if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) { - throw new ExecException("Invalid operator type in the split plan: " - + leaf.getOperatorKey()); - } - - runPipeline(leaf); + Result res = runPipeline(leaf); + + if (res.returnStatus == POStatus.STATUS_EOP) { + processedSet.set(idx++); + if (idx < myPlans.size()) { + res = processPlan(); + } } + + return (res.returnStatus == POStatus.STATUS_OK) ? res : empty; } - private void runPipeline(PhysicalOperator leaf) throws ExecException { + private Result runPipeline(PhysicalOperator leaf) throws ExecException { + Result res = null; + while (true) { - Result res = leaf.getNext(dummyTuple); + res = leaf.getNext(dummyTuple); - if (res.returnStatus == POStatus.STATUS_OK || - res.returnStatus == POStatus.STATUS_NULL) { + if (res.returnStatus == POStatus.STATUS_OK) { + break; + } else if (res.returnStatus == POStatus.STATUS_NULL) { continue; - } - - if (res.returnStatus == POStatus.STATUS_EOP) { + } else if (res.returnStatus == POStatus.STATUS_EOP) { + break; + } else if (res.returnStatus == POStatus.STATUS_ERR) { break; } - - if (res.returnStatus == POStatus.STATUS_ERR) { - - // if there is an err message use it - String errMsg = (res.result != null) ? - "Received Error while processing the split plan: " + res.result : - "Received Error while processing the split plan."; - - throw new ExecException(errMsg); - } - } + } + + return res; } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Mon Mar 23 23:36:10 2009 @@ -53,11 +53,13 @@ stores.add((POStore)leaf); } if (leaf instanceof POSplit) { - PhysicalPlan pl = ((POSplit)leaf).getPlan(); - List<POStore> nestedStores = getStores(pl); - stores.addAll(nestedStores); + List<PhysicalPlan> pls = ((POSplit)leaf).getPlans(); + for (PhysicalPlan pl : pls) { + List<POStore> nestedStores = getStores(pl); + stores.addAll(nestedStores); + } } - } + } return stores; } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java Mon Mar 23 23:36:10 2009 @@ -32,6 +32,7 @@ import org.apache.pig.ExecType; import org.apache.pig.ReversibleLoadStoreFunc; import org.apache.pig.backend.datastorage.DataStorage; +import org.apache.pig.backend.datastorage.ElementDescriptor; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataReaderWriter; @@ -213,17 +214,15 @@ public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException { - InputStream is = null; - - try { - is = FileLocalizer.open(fileName, execType, storage); - } catch (IOException e) { + if (!FileLocalizer.fileExists(fileName, storage)) { // At compile time in batch mode, the file may not exist // (such as intermediate file). Just return null - the - // same way as we could's get a valid record from the input. + // same way as we would if we did not get a valid record return null; } + InputStream is = FileLocalizer.open(fileName, execType, storage); + bindTo(fileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); // get the first record from the input file // and figure out the schema from the data in Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Mon Mar 23 23:36:10 2009 @@ -28,33 +28,20 @@ * DependencyOrderWalker traverses the graph in such a way that no node is visited * before all the nodes it depends on have been visited. Beyond this, it does not * guarantee any particular order. So, you have a graph with node 1 2 3 4, and - * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be visited + * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be visited * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be * visited first. */ public class DependencyOrderWalker <O extends Operator, P extends OperatorPlan<O>> extends PlanWalker<O, P> { - private boolean rootsFirst = true; - /** * @param plan Plan for this walker to traverse. */ public DependencyOrderWalker(P plan) { - this(plan, true); - } - - /** - * Constructs a walker with the specified plan and walk direction - * @param plan plan for this walker to traverse - * @param rootsFirst flag that indicates walking up (from roots - * to leaves) or walking down (from leaves to roots) - */ - public DependencyOrderWalker(P plan, boolean rootsFirst) { super(plan); - this.rootsFirst = rootsFirst; } - + /** * Begin traversing the graph. * @param visitor Visitor this walker is being used by. @@ -71,14 +58,12 @@ List<O> fifo = new ArrayList<O>(); Set<O> seen = new HashSet<O>(); - List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots(); - - if (nodes == null) return; - - for (O op : nodes) { - doAllDependencies(op, seen, fifo); + List<O> leaves = mPlan.getLeaves(); + if (leaves == null) return; + for (O op : leaves) { + doAllPredecessors(op, seen, fifo); } - + for (O op: fifo) { op.visit(visitor); } @@ -88,28 +73,9 @@ return new DependencyOrderWalker<O, P>(plan); } - protected void doAllDependencies(O node, - Set<O> seen, - Collection<O> fifo) throws VisitorException { - if (!seen.contains(node)) { - // We haven't seen this one before. - Collection<O> nodes = rootsFirst ? - mPlan.getPredecessors(node) : mPlan.getSuccessors(node); - if (nodes != null) { - // Do all our predecessors before ourself - for (O op : nodes) { - doAllDependencies(op, seen, fifo); - } - } - // Now do ourself - seen.add(node); - fifo.add(node); - } - } - protected void doAllPredecessors(O node, - Set<O> seen, - Collection<O> fifo) throws VisitorException { + Set<O> seen, + Collection<O> fifo) throws VisitorException { if (!seen.contains(node)) { // We haven't seen this one before. Collection<O> preds = mPlan.getPredecessors(node); @@ -120,9 +86,8 @@ } } // Now do ourself - seen.add(node); + seen.add(node); fifo.add(node); } } - } Added: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java?rev=757598&view=auto ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java (added) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java Mon Mar 23 23:36:10 2009 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +/** + * ReverseDependencyOrderWalker traverses the graph in such a way that no node is visited + * before all the nodes that are its successors on have been visited. Beyond this, it does not + * guarantee any particular order. So, you have a graph with node 1 2 3 4, and + * edges 1->3, 2->3, and 3->4, this walker guarantees that 4 will be visited + * before 3 and 3 before 1 and 2, but it does not guarantee whether 1 or 2 will be + * visited first. + */ +public class ReverseDependencyOrderWalker <O extends Operator, P extends OperatorPlan<O>> + extends PlanWalker<O, P> { + + /** + * @param plan Plan for this walker to traverse. + */ + public ReverseDependencyOrderWalker(P plan) { + super(plan); + } + + /** + * Begin traversing the graph. + * @param visitor Visitor this walker is being used by. + * @throws VisitorException if an error is encountered while walking. + */ + public void walk(PlanVisitor<O, P> visitor) throws VisitorException { + // This is highly inefficient, but our graphs are small so it should be okay. + // The algorithm works by starting at any node in the graph, finding it's + // successors and calling itself for each of those successors. When it + // finds a node that has no unfinished successors it puts that node in the + // list. It then unwinds itself putting each of the other nodes in the list. + // It keeps track of what nodes it's seen as it goes so it doesn't put any + // nodes in the graph twice. + + List<O> fifo = new ArrayList<O>(); + Set<O> seen = new HashSet<O>(); + List<O> roots = mPlan.getRoots(); + if (roots == null) return; + for (O op : roots) { + doAllSuccessors(op, seen, fifo); + } + + for (O op: fifo) { + op.visit(visitor); + } + } + + public PlanWalker<O, P> spawnChildWalker(P plan) { + return new ReverseDependencyOrderWalker<O, P>(plan); + } + + protected void doAllSuccessors(O node, + Set<O> seen, + Collection<O> fifo) throws VisitorException { + if (!seen.contains(node)) { + // We haven't seen this one before. + Collection<O> succs = mPlan.getSuccessors(node); + if (succs != null && succs.size() > 0) { + // Do all our successors before ourself + for (O op : succs) { + doAllSuccessors(op, seen, fifo); + } + } + // Now do ourself + seen.add(node); + fifo.add(node); + } + } +}