Author: pradeepkth
Date: Mon Mar 23 23:36:10 2009
New Revision: 757598
URL: http://svn.apache.org/viewvc?rev=757598&view=rev
Log:
PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Mon Mar 23 23:36:10 2009
@@ -414,3 +414,5 @@
PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+ PIG-627: multiquery support incremental patch (Richard Ding via pradeepkth)
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Mon Mar 23 23:36:10 2009
@@ -64,7 +64,6 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -240,9 +239,6 @@
la.visit();
la.adjust();
- MultiQueryAdjuster ma = new MultiQueryAdjuster(MRPlan);
- ma.visit();
-
return MRPlan;
}
@@ -309,11 +305,6 @@
return st;
}
- private POSplit getSplit(){
- POSplit sp = new POSplit(new
OperatorKey(scope,nig.getNextNodeId(scope)));
- return sp;
- }
-
/**
* A map MROper is an MROper whose map plan is still open
* for taking more non-blocking operators.
@@ -611,6 +602,7 @@
try{
FileSpec fSpec = op.getSplitStore();
MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+ mro.setSplitter(true);
splitsSeen.put(op.getOperatorKey(), mro);
curMROp = startNew(fSpec, mro);
}catch(Exception e){
@@ -1665,257 +1657,4 @@
}
}
- /**
- * An adjuster that merges all or part splitee MapReduceOpers into
- * splitter MapReduceOper. The merge can produce a MROperPlan that has
- * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
- *
- * The MRCompler generates multiple MapReduceOpers whenever it encounters
- * a split operator and connects the single splitter MapReduceOper to
- * one or more splitee MapReduceOpers using store/load operators:
- *
- * ---- POStore (in splitter) -... ----
- * | | ... |
- * | | ... |
- * POLoad POLoad ... POLoad (in splitees)
- * | | |
- *
- * This adjuster merges those MapReduceOpers by replacing POLoad/POStore
- * combination with POSplit operator.
- */
- private class MultiQueryAdjuster extends MROpPlanVisitor {
-
- MultiQueryAdjuster(MROperPlan plan) {
- super(plan, new DependencyOrderWalker<MapReduceOper,
MROperPlan>(plan, false));
- }
-
- @Override
- public void visitMROp(MapReduceOper mr) throws VisitorException {
-
- if (!isSplitter(mr)) {
- return;
- }
-
- // find all the single load map-only MROpers in the splitees
- List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
- List<MapReduceOper> multiLoadMappers = new
ArrayList<MapReduceOper>();
- List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
-
- List<MapReduceOper> successors = getPlan().getSuccessors(mr);
- for (MapReduceOper successor : successors) {
- if (isMapOnly(successor)) {
- if (isSingleLoadMapperPlan(successor.mapPlan)) {
- mappers.add(successor);
- } else {
- multiLoadMappers.add(successor);
- }
- } else {
- mapReducers.add(successor);
- }
- }
-
- PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan :
mr.reducePlan;
- PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
- List<PhysicalOperator> storePreds =
splitterPl.getPredecessors(storeOp);
-
- if (mappers.size() == 1 && mapReducers.size() == 0 &&
multiLoadMappers.size() == 0) {
-
- // In this case, just add splitee's map plan to the splitter's
plan
- MapReduceOper mapper = mappers.get(0);
-
- PhysicalPlan pl = mapper.mapPlan;
- PhysicalOperator load = pl.getRoots().get(0);
- pl.remove(load);
-
- // make a copy before removing the store operator
- List<PhysicalOperator> predsCopy = new
ArrayList<PhysicalOperator>(storePreds);
- splitterPl.remove(storeOp);
-
- // connect two plans
- List<PhysicalOperator> roots = pl.getRoots();
- try {
- splitterPl.merge(pl);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
-
- for (PhysicalOperator pred : predsCopy) {
- for (PhysicalOperator root : roots) {
- try {
- splitterPl.connect(pred, root);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
- }
-
- } else if (mappers.size() > 0) {
-
- // merge splitee's map plans into nested plan of
- // the splitter operator
- POSplit splitOp = getSplit();
- for (MapReduceOper mapper : mappers) {
-
- PhysicalPlan pl = mapper.mapPlan;
- PhysicalOperator load = pl.getRoots().get(0);
- pl.remove(load);
- try {
- splitOp.addPlan(pl);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
-
- // add original store to the split operator
- // if there is at least one MapReduce splitee
- if (mapReducers.size() + multiLoadMappers.size() > 0) {
- PhysicalPlan storePlan = new PhysicalPlan();
- try {
- storePlan.addAsLeaf(storeOp);
- splitOp.addPlan(storePlan);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
-
- // replace store operator in the splitter with split operator
- splitOp.setInputs(storePreds);
- try {
- splitterPl.replace(storeOp, splitOp);;
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
-
- // remove all the map-only splitees from the MROperPlan
- for (MapReduceOper mapper : mappers) {
- removeAndReconnect(mapper, mr);
- }
-
- // TO-DO: merge the other splitees if possible
- if (mapReducers.size() + multiLoadMappers.size() > 0 ) {
- // XXX
- }
- }
-
- /**
- * Removes the specified MR operator from the plan after the merge.
- * Connects its predecessors and successors to the merged MR operator
- * @param mr the MR operator to remove
- * @param newMR the MR operator to be connected to the predecessors
and
- * the successors of the removed operator
- * @throws VisitorException if connect operation fails
- */
- private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR)
throws VisitorException {
- List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
- List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
-
- // make a copy before removing operator
- ArrayList<MapReduceOper> succsCopy = null;
- ArrayList<MapReduceOper> predsCopy = null;
- if (mapperSuccs != null) {
- succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
- }
- if (mapperPreds != null) {
- predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
- }
- getPlan().remove(mr);
-
- // reconnect the mapper's successors
- if (succsCopy != null) {
- for (MapReduceOper succ : succsCopy) {
- try {
- getPlan().connect(newMR, succ);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
- }
-
- // reconnect the mapper's predecessors
- if (predsCopy != null) {
- for (MapReduceOper pred : predsCopy) {
- if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
- continue;
- }
- try {
- getPlan().connect(pred, newMR);
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
- }
- }
-
- /*
- * Checks whether the specified MapReduce operator is a splitter
- */
- private boolean isSplitter(MapReduceOper mr) {
-
- List<MapReduceOper> successors = getPlan().getSuccessors(mr);
- if (successors == null || successors.size() == 0) {
- return false;
- }
-
- PhysicalPlan pl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
-
- List<PhysicalOperator> mapLeaves = pl.getLeaves();
- if (mapLeaves == null || mapLeaves.size() != 1) {
- return false;
- }
-
- PhysicalOperator mapLeaf = mapLeaves.get(0);
- if (!(mapLeaf instanceof POStore)) {
- return false;
- }
-
- POStore store = (POStore)mapLeaf;
- String fileName = store.getSFile().getFileName();
-
- for (MapReduceOper mro : successors) {
- List<PhysicalOperator> roots = mro.mapPlan.getRoots();
- boolean splitee = false;
- for (PhysicalOperator root : roots) {
- if (root instanceof POLoad) {
- POLoad load = (POLoad)root;
- if (fileName.compareTo(load.getLFile().getFileName())
== 0) {
- splitee = true;
- break;
- }
- }
- }
- if (!splitee) return false;
- }
-
- return true;
- }
-
- private boolean isMapOnly(MapReduceOper mr) {
- return mr.reducePlan.isEmpty();
- }
-
- private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
- List<PhysicalOperator> roots = pl.getRoots();
- if (roots.size() != 1) {
- return false;
- }
-
- PhysicalOperator root = roots.get(0);
- if (!(root instanceof POLoad)) {
- throw new IllegalStateException("Invalid root operator in
mapper Splitee: "
- + root.getClass().getName());
- }
- List<PhysicalOperator> successors = pl.getSuccessors(root);
- if (successors == null || successors.size() != 1) {
- throw new IllegalStateException("Root in mapper Splitee has no
successor: "
- + successors.size());
- }
- PhysicalOperator op = successors.get(0);
- if (!(op instanceof POFilter)) {
- throw new IllegalStateException("Invalid successor of load in
mapper Splitee: "
- + op.getClass().getName());
- }
- return true;
- }
- }
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Mon Mar 23 23:36:10 2009
@@ -183,6 +183,10 @@
// an appropriate NullableXXXWritable object
KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
kdv.visit();
+
+ MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+ mqOptimizer.visit();
+
return plan;
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Mon Mar 23 23:36:10 2009
@@ -109,6 +109,10 @@
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
+ // Indicates that this MROper is a splitter MROper.
+ // That is, this MROper ends due to a POSPlit operator.
+ private boolean splitter = false;
+
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
@@ -327,4 +331,12 @@
public int getRequestedParallelism() {
return requestedParallelism;
}
+
+ public void setSplitter(boolean spl) {
+ splitter = spl;
+ }
+
+ public boolean isSplitter() {
+ return splitter;
+ }
}
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=757598&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
Mon Mar 23 23:36:10 2009
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An optimizer that merges all or part splittee MapReduceOpers into
+ * splitter MapReduceOper. The merge can produce a MROperPlan that has
+ * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
+ *
+ * The MRCompler generates multiple MapReduceOpers whenever it encounters
+ * a split operator and connects the single splitter MapReduceOper to
+ * one or more splittee MapReduceOpers using store/load operators:
+ *
+ * ---- POStore (in splitter) -... ----
+ * | | ... |
+ * | | ... |
+ * POLoad POLoad ... POLoad (in splittees)
+ * | | |
+ *
+ * This optimizer merges those MapReduceOpers by replacing POLoad/POStore
+ * combination with POSplit operator.
+ */
+class MultiQueryOptimizer extends MROpPlanVisitor {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ private NodeIdGenerator nig;
+
+ private String scope;
+
+ MultiQueryOptimizer(MROperPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker<MapReduceOper,
MROperPlan>(plan));
+ nig = NodeIdGenerator.getGenerator();
+ List<MapReduceOper> roots = plan.getRoots();
+ scope = roots.get(0).getOperatorKey().getScope();
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+
+ if (!mr.isSplitter()) {
+ return;
+ }
+
+ // first classify all the splittees
+ List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
+ List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>();
+ List<MapReduceOper> mapReducers = new ArrayList<MapReduceOper>();
+
+ List<MapReduceOper> successors = getPlan().getSuccessors(mr);
+ for (MapReduceOper successor : successors) {
+ if (isMapOnly(successor)) {
+ if (isSingleLoadMapperPlan(successor.mapPlan)) {
+ mappers.add(successor);
+ } else {
+ multiLoadMROpers.add(successor);
+ }
+ } else {
+ if (isSingleLoadMapperPlan(successor.mapPlan)) {
+ mapReducers.add(successor);
+ } else {
+ multiLoadMROpers.add(successor);
+ }
+ }
+ }
+
+ // case 1: exactly one splittee and it's map-only
+ if (mappers.size() == 1 && mapReducers.size() == 0
+ && multiLoadMROpers.size() == 0 ) {
+ mergeOnlyMapperSplittee(mappers.get(0), mr);
+ return;
+ }
+
+ // case 2: exactly one splittee and it has reducer
+ if (isMapOnly(mr) && mapReducers.size() == 1
+ && mappers.size() == 0 && multiLoadMROpers.size() == 0) {
+ mergeOnlyMapReduceSplittee(mapReducers.get(0), mr);
+ return;
+ }
+
+ PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
+ POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
+
+ POSplit splitOp = null;
+
+ // case 3: multiple splittees and at least one of them is map-only
+ if (mappers.size() > 0) {
+ splitOp = getSplit();
+ mergeAllMapOnlySplittees(mappers, mr, splitOp);
+ }
+
+ boolean splitterMapOnly = isMapOnly(mr);
+
+ // case 4: multiple splittees and at least one of them has reducer
+ if (splitterMapOnly && mapReducers.size() > 0) {
+
+ // pick one to merge, prefer one that has a combiner
+ MapReduceOper mapReducer= mapReducers.get(0);
+ for (MapReduceOper mro : mapReducers) {
+ if (!mro.combinePlan.isEmpty()) {
+ mapReducer = mro;
+ break;
+ }
+ }
+
+ PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+
+ splitOp = (leaf instanceof POStore) ?
+ getSplit() : (POSplit)leaf;
+
+ mergeSingleMapReduceSplittee(mapReducer, mr, splitOp);
+ }
+
+ // finally, add original store to the split operator
+ // if there is splittee that hasn't been merged
+ if (splitOp != null
+ && ((multiLoadMROpers.size() > 0)
+ || (mapReducers.size() > 1)
+ || (!splitterMapOnly && mapReducers.size() > 0))) {
+
+ PhysicalPlan storePlan = new PhysicalPlan();
+ try {
+ storePlan.addAsLeaf(storeOp);
+ splitOp.addPlan(storePlan);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+
+ private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
+ throws VisitorException {
+ PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan :
splitter.reducePlan;
+ POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
+ List<PhysicalOperator> storePreds =
splitterPl.getPredecessors(storeOp);
+
+ PhysicalPlan pl = mapper.mapPlan;
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+
+ // make a copy before removing the store operator
+ List<PhysicalOperator> predsCopy = new
ArrayList<PhysicalOperator>(storePreds);
+ splitterPl.remove(storeOp);
+
+ try {
+ splitterPl.merge(pl);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ // connect two plans
+ List<PhysicalOperator> roots = pl.getRoots();
+ for (PhysicalOperator pred : predsCopy) {
+ for (PhysicalOperator root : roots) {
+ try {
+ splitterPl.connect(pred, root);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+ }
+
+ private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper
splitter)
+ throws VisitorException {
+ mergeOneMapPart(mapper, splitter);
+ removeAndReconnect(mapper, splitter);
+ }
+
+ private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer,
MapReduceOper splitter)
+ throws VisitorException {
+ mergeOneMapPart(mapReducer, splitter);
+
+ splitter.setMapDone(true);
+ splitter.reducePlan = mapReducer.reducePlan;
+ splitter.setReduceDone(true);
+
+ removeAndReconnect(mapReducer, splitter);
+ }
+
+ private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers,
+ MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+
+ PhysicalPlan splitterPl = isMapOnly(splitter) ?
+ splitter.mapPlan : splitter.reducePlan;
+ PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+ List<PhysicalOperator> storePreds =
splitterPl.getPredecessors(storeOp);
+
+ // merge splitee's map plans into nested plan of
+ // the splitter operator
+ for (MapReduceOper mapper : mappers) {
+ PhysicalPlan pl = mapper.mapPlan;
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+ try {
+ splitOp.addPlan(pl);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // replace store operator in the splitter with split operator
+ splitOp.setInputs(storePreds);
+ try {
+ splitterPl.replace(storeOp, splitOp);;
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ // remove all the map-only splittees from the MROperPlan
+ for (MapReduceOper mapper : mappers) {
+ removeAndReconnect(mapper, splitter);
+ }
+ }
+
+ private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
+ MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+
+ PhysicalPlan splitterPl = splitter.mapPlan;
+ PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+ PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+ List<PhysicalOperator> storePreds =
splitterPl.getPredecessors(storeOp);
+
+ PhysicalPlan pl = mapReduce.mapPlan;
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+ try {
+ splitOp.addPlan(pl);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+
+ splitter.setMapDone(true);
+ splitter.reducePlan = mapReduce.reducePlan;
+ splitter.setReduceDone(true);
+ splitter.combinePlan = mapReduce.combinePlan;
+
+ // replace store operator in the splitter with split operator
+ if (leaf instanceof POStore) {
+ splitOp.setInputs(storePreds);
+ try {
+ splitterPl.replace(storeOp, splitOp);;
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ removeAndReconnect(mapReduce, splitter);
+ }
+
+ /**
+ * Removes the specified MR operator from the plan after the merge.
+ * Connects its predecessors and successors to the merged MR operator
+ * @param mr the MR operator to remove
+ * @param newMR the MR operator to be connected to the predecessors and
+ * the successors of the removed operator
+ * @throws VisitorException if connect operation fails
+ */
+ private void removeAndReconnect(MapReduceOper mr, MapReduceOper newMR)
throws VisitorException {
+ List<MapReduceOper> mapperSuccs = getPlan().getSuccessors(mr);
+ List<MapReduceOper> mapperPreds = getPlan().getPredecessors(mr);
+
+ // make a copy before removing operator
+ ArrayList<MapReduceOper> succsCopy = null;
+ ArrayList<MapReduceOper> predsCopy = null;
+ if (mapperSuccs != null) {
+ succsCopy = new ArrayList<MapReduceOper>(mapperSuccs);
+ }
+ if (mapperPreds != null) {
+ predsCopy = new ArrayList<MapReduceOper>(mapperPreds);
+ }
+ getPlan().remove(mr);
+
+ // reconnect the mapper's successors
+ if (succsCopy != null) {
+ for (MapReduceOper succ : succsCopy) {
+ try {
+ getPlan().connect(newMR, succ);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+
+ // reconnect the mapper's predecessors
+ if (predsCopy != null) {
+ for (MapReduceOper pred : predsCopy) {
+ if (newMR.getOperatorKey().equals(pred.getOperatorKey())) {
+ continue;
+ }
+ try {
+ getPlan().connect(pred, newMR);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+ }
+
+ mergeMROperProperties(mr, newMR);
+ }
+
+ private void mergeMROperProperties(MapReduceOper from, MapReduceOper to) {
+
+ if (from.isStreamInMap()) {
+ to.setStreamInMap(true);
+ }
+
+ if (from.isStreamInReduce()) {
+ to.setStreamInReduce(true);
+ }
+
+ if (from.getRequestedParallelism() > to.getRequestedParallelism()) {
+ to.requestedParallelism = from.requestedParallelism;
+ }
+
+ if (!from.UDFs.isEmpty()) {
+ to.UDFs.addAll(from.UDFs);
+ }
+
+ if (from.needsDistinctCombiner()) {
+ to.setNeedsDistinctCombiner(true);
+ }
+
+ if (to.mapKeyType == DataType.UNKNOWN) {
+ to.mapKeyType = from.mapKeyType;
+ }
+ }
+
+ private boolean isMapOnly(MapReduceOper mr) {
+ return mr.reducePlan.isEmpty();
+ }
+
+ private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
+ List<PhysicalOperator> roots = pl.getRoots();
+ return (roots.size() == 1);
+ }
+
+ private POSplit getSplit(){
+ POSplit sp = new POSplit(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ return sp;
+ }
+}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
Mon Mar 23 23:36:10 2009
@@ -18,6 +18,7 @@
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -84,17 +85,16 @@
* which is the job containing the split
*/
private FileSpec splitStore;
-
- /*
- * The inner physical plan
- */
- private PhysicalPlan myPlan = new PhysicalPlan();
-
+
/*
* The list of sub-plans the inner plan is composed of
*/
private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
+ private BitSet processedSet = new BitSet();
+
+ private static Result empty = new Result(POStatus.STATUS_NULL, null);
+
/**
* Constructs an operator with the specified key
* @param k the operator key
@@ -170,16 +170,7 @@
}
/**
- * Returns the inner physical plan associated with this operator
- * @return the inner plan
- */
- public PhysicalPlan getPlan() {
- return myPlan;
- }
-
- /**
- * Returns the list of nested plans. This is used by
- * explain method to display the inner plans.
+ * Returns the list of nested plans.
* @return the list of the nested plans
* @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
*/
@@ -193,67 +184,69 @@
* @param plan plan to be appended to the list
*/
public void addPlan(PhysicalPlan inPlan) throws PlanException {
- myPlan.merge(inPlan);
myPlans.add(inPlan);
+ processedSet.set(myPlans.size()-1);
}
@Override
public Result getNext(Tuple t) throws ExecException {
- Result inp = processInput();
+ if (processedSet.cardinality() == myPlans.size()) {
- if (inp.returnStatus == POStatus.STATUS_EOP) {
- return inp;
- }
+ Result inp = processInput();
+
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ return inp;
+ }
- // process the nested plans
- myPlan.attachInput((Tuple)inp.result);
- processPlan();
+ Tuple tuple = (Tuple)inp.result;
+ for (PhysicalPlan pl : myPlans) {
+ pl.attachInput(tuple);
+ }
- return new Result(POStatus.STATUS_NULL, null);
+ processedSet.clear();
+ }
+
+ return processPlan();
}
- private void processPlan() throws ExecException {
+ private Result processPlan() throws ExecException {
- List<PhysicalOperator> leaves = myPlan.getLeaves();
+ int idx = processedSet.nextClearBit(0);
+ PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
- for (PhysicalOperator leaf : leaves) {
-
- // TO-DO: other types of leaves are possible later
- if (!(leaf instanceof POStore) && !(leaf instanceof POSplit)) {
- throw new ExecException("Invalid operator type in the split
plan: "
- + leaf.getOperatorKey());
- }
-
- runPipeline(leaf);
+ Result res = runPipeline(leaf);
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ processedSet.set(idx++);
+ if (idx < myPlans.size()) {
+ res = processPlan();
+ }
}
+
+ return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
}
- private void runPipeline(PhysicalOperator leaf) throws ExecException {
+ private Result runPipeline(PhysicalOperator leaf) throws ExecException {
+ Result res = null;
+
while (true) {
- Result res = leaf.getNext(dummyTuple);
+ res = leaf.getNext(dummyTuple);
- if (res.returnStatus == POStatus.STATUS_OK ||
- res.returnStatus == POStatus.STATUS_NULL) {
+ if (res.returnStatus == POStatus.STATUS_OK) {
+ break;
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
- }
-
- if (res.returnStatus == POStatus.STATUS_EOP) {
+ } else if (res.returnStatus == POStatus.STATUS_EOP) {
+ break;
+ } else if (res.returnStatus == POStatus.STATUS_ERR) {
break;
}
-
- if (res.returnStatus == POStatus.STATUS_ERR) {
-
- // if there is an err message use it
- String errMsg = (res.result != null) ?
- "Received Error while processing the split plan: " +
res.result :
- "Received Error while processing the split plan.";
-
- throw new ExecException(errMsg);
- }
- }
+ }
+
+ return res;
}
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
Mon Mar 23 23:36:10 2009
@@ -53,11 +53,13 @@
stores.add((POStore)leaf);
}
if (leaf instanceof POSplit) {
- PhysicalPlan pl = ((POSplit)leaf).getPlan();
- List<POStore> nestedStores = getStores(pl);
- stores.addAll(nestedStores);
+ List<PhysicalPlan> pls = ((POSplit)leaf).getPlans();
+ for (PhysicalPlan pl : pls) {
+ List<POStore> nestedStores = getStores(pl);
+ stores.addAll(nestedStores);
+ }
}
- }
+ }
return stores;
}
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/builtin/BinStorage.java
Mon Mar 23 23:36:10 2009
@@ -32,6 +32,7 @@
import org.apache.pig.ExecType;
import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataReaderWriter;
@@ -213,17 +214,15 @@
public Schema determineSchema(String fileName, ExecType execType,
DataStorage storage) throws IOException {
- InputStream is = null;
-
- try {
- is = FileLocalizer.open(fileName, execType, storage);
- } catch (IOException e) {
+ if (!FileLocalizer.fileExists(fileName, storage)) {
// At compile time in batch mode, the file may not exist
// (such as intermediate file). Just return null - the
- // same way as we could's get a valid record from the input.
+ // same way as we would if we did not get a valid record
return null;
}
+ InputStream is = FileLocalizer.open(fileName, execType, storage);
+
bindTo(fileName, new BufferedPositionedInputStream(is), 0,
Long.MAX_VALUE);
// get the first record from the input file
// and figure out the schema from the data in
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
Mon Mar 23 23:36:10 2009
@@ -28,33 +28,20 @@
* DependencyOrderWalker traverses the graph in such a way that no node is
visited
* before all the nodes it depends on have been visited. Beyond this, it does
not
* guarantee any particular order. So, you have a graph with node 1 2 3 4, and
- * edges 1->3, 2->3, and 3->4, this walker guarantees that 1 and 2 will be
visited
+ * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be
visited
* before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
* visited first.
*/
public class DependencyOrderWalker <O extends Operator, P extends
OperatorPlan<O>>
extends PlanWalker<O, P> {
- private boolean rootsFirst = true;
-
/**
* @param plan Plan for this walker to traverse.
*/
public DependencyOrderWalker(P plan) {
- this(plan, true);
- }
-
- /**
- * Constructs a walker with the specified plan and walk direction
- * @param plan plan for this walker to traverse
- * @param rootsFirst flag that indicates walking up (from roots
- * to leaves) or walking down (from leaves to roots)
- */
- public DependencyOrderWalker(P plan, boolean rootsFirst) {
super(plan);
- this.rootsFirst = rootsFirst;
}
-
+
/**
* Begin traversing the graph.
* @param visitor Visitor this walker is being used by.
@@ -71,14 +58,12 @@
List<O> fifo = new ArrayList<O>();
Set<O> seen = new HashSet<O>();
- List<O> nodes = rootsFirst ? mPlan.getLeaves() : mPlan.getRoots();
-
- if (nodes == null) return;
-
- for (O op : nodes) {
- doAllDependencies(op, seen, fifo);
+ List<O> leaves = mPlan.getLeaves();
+ if (leaves == null) return;
+ for (O op : leaves) {
+ doAllPredecessors(op, seen, fifo);
}
-
+
for (O op: fifo) {
op.visit(visitor);
}
@@ -88,28 +73,9 @@
return new DependencyOrderWalker<O, P>(plan);
}
- protected void doAllDependencies(O node,
- Set<O> seen,
- Collection<O> fifo) throws
VisitorException {
- if (!seen.contains(node)) {
- // We haven't seen this one before.
- Collection<O> nodes = rootsFirst ?
- mPlan.getPredecessors(node) : mPlan.getSuccessors(node);
- if (nodes != null) {
- // Do all our predecessors before ourself
- for (O op : nodes) {
- doAllDependencies(op, seen, fifo);
- }
- }
- // Now do ourself
- seen.add(node);
- fifo.add(node);
- }
- }
-
protected void doAllPredecessors(O node,
- Set<O> seen,
- Collection<O> fifo) throws
VisitorException {
+ Set<O> seen,
+ Collection<O> fifo) throws VisitorException
{
if (!seen.contains(node)) {
// We haven't seen this one before.
Collection<O> preds = mPlan.getPredecessors(node);
@@ -120,9 +86,8 @@
}
}
// Now do ourself
- seen.add(node);
+ seen.add(node);
fifo.add(node);
}
}
-
}
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java?rev=757598&view=auto
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
(added)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
Mon Mar 23 23:36:10 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * ReverseDependencyOrderWalker traverses the graph in such a way that no node
is visited
+ * before all the nodes that are its successors on have been visited. Beyond
this, it does not
+ * guarantee any particular order. So, you have a graph with node 1 2 3 4, and
+ * edges 1->3, 2->3, and 3->4, this walker guarantees that 4 will be visited
+ * before 3 and 3 before 1 and 2, but it does not guarantee whether 1 or 2
will be
+ * visited first.
+ */
+public class ReverseDependencyOrderWalker <O extends Operator, P extends
OperatorPlan<O>>
+ extends PlanWalker<O, P> {
+
+ /**
+ * @param plan Plan for this walker to traverse.
+ */
+ public ReverseDependencyOrderWalker(P plan) {
+ super(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws VisitorException if an error is encountered while walking.
+ */
+ public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+ // This is highly inefficient, but our graphs are small so it should
be okay.
+ // The algorithm works by starting at any node in the graph, finding
it's
+ // successors and calling itself for each of those successors. When it
+ // finds a node that has no unfinished successors it puts that node in
the
+ // list. It then unwinds itself putting each of the other nodes in
the list.
+ // It keeps track of what nodes it's seen as it goes so it doesn't put
any
+ // nodes in the graph twice.
+
+ List<O> fifo = new ArrayList<O>();
+ Set<O> seen = new HashSet<O>();
+ List<O> roots = mPlan.getRoots();
+ if (roots == null) return;
+ for (O op : roots) {
+ doAllSuccessors(op, seen, fifo);
+ }
+
+ for (O op: fifo) {
+ op.visit(visitor);
+ }
+ }
+
+ public PlanWalker<O, P> spawnChildWalker(P plan) {
+ return new ReverseDependencyOrderWalker<O, P>(plan);
+ }
+
+ protected void doAllSuccessors(O node,
+ Set<O> seen,
+ Collection<O> fifo) throws VisitorException
{
+ if (!seen.contains(node)) {
+ // We haven't seen this one before.
+ Collection<O> succs = mPlan.getSuccessors(node);
+ if (succs != null && succs.size() > 0) {
+ // Do all our successors before ourself
+ for (O op : succs) {
+ doAllSuccessors(op, seen, fifo);
+ }
+ }
+ // Now do ourself
+ seen.add(node);
+ fifo.add(node);
+ }
+ }
+}