Author: gates Date: Mon Feb 11 15:19:36 2008 New Revision: 620665 URL: http://svn.apache.org/viewvc?rev=620665&view=rev Log: PIG-97: Turn off combiner in the case of Cogroup, as it doesn't work when more than one bag is involved.
Added: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java Removed: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POVisitor.java Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POLoad.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSort.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSplit.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POStore.java incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POUnion.java incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Mon Feb 11 15:19:36 2008 @@ -91,3 +91,6 @@ PIG-95: Remove System.exit() statements from inside pig (joa23 via gates). PIG-65: convert tabs to spaces (groves via olgan) + + PIG-97: Turn off combiner in the case of Cogroup, as it doesn't work when + more than one bag is involved (gates). Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java Mon Feb 11 15:19:36 2008 @@ -8,8 +8,9 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecPhysicalOperator; import org.apache.pig.backend.executionengine.ExecPhysicalPlan; -import org.apache.pig.backend.local.executionengine.POVisitor; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POPrinter; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.logicalLayer.OperatorKey; public class MapRedPhysicalPlan implements ExecPhysicalPlan { @@ -35,9 +36,9 @@ } public void explain(OutputStream out) { - POVisitor lprinter = new POVisitor(new PrintStream(out)); + POVisitor lprinter = new POPrinter(opTable, new PrintStream(out)); - ((PhysicalOperator)opTable.get(root)).visit(lprinter, ""); + ((PhysicalOperator)opTable.get(root)).visit(lprinter); } public Map<OperatorKey, ExecPhysicalOperator> getOpTable() { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Mon Feb 11 15:19:36 2008 @@ -278,10 +278,9 @@ mro.mapParallelism = Math.max(mro.mapParallelism, lo.getRequestedParallelism()); } else { // push into "reduce" phase - EvalSpec spec = lo.getSpec(); - if (mro.toReduce == null && shouldCombine(spec)) { + if (mro.toReduce == null && shouldCombine(spec, mro)) { // Push this spec into the combiner. But we also need to // create a new spec with a changed expected projection to // push into the reducer. @@ -413,10 +412,16 @@ return sortJob; } - private boolean shouldCombine(EvalSpec spec) { + private boolean shouldCombine(EvalSpec spec, POMapreduce mro) { // Determine whether this something we can combine or not. // First, it must be a generate spec. if (!(spec instanceof GenerateSpec)) { + return false; + } + + // Can only combine if there is a single file being grouped, + // cogroups can't use the combiner at this point. + if (mro.groupFuncs.size() > 1) { return false; } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Mon Feb 11 15:19:36 2008 @@ -27,7 +27,6 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.pig.backend.executionengine.ExecPhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher; -import org.apache.pig.backend.local.executionengine.POVisitor; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; @@ -36,6 +35,7 @@ import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.PigLogger; @@ -252,10 +252,8 @@ toReduce = toReduce.addSpec(spec); } - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POMapReduce (" + this + "): scope id: " + this.scope); + public void visit(POVisitor v) { + v.visitMapreduce(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java Mon Feb 11 15:19:36 2008 @@ -9,6 +9,8 @@ import org.apache.pig.backend.executionengine.ExecPhysicalOperator; import org.apache.pig.backend.executionengine.ExecPhysicalPlan; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; +import org.apache.pig.impl.physicalLayer.POPrinter; import org.apache.pig.impl.logicalLayer.OperatorKey; public class LocalPhysicalPlan implements ExecPhysicalPlan { @@ -33,9 +35,9 @@ } public void explain(OutputStream out) { - POVisitor lprinter = new POVisitor(new PrintStream(out)); + POVisitor lprinter = new POPrinter(opTable, new PrintStream(out)); - ((PhysicalOperator)opTable.get(root)).visit(lprinter, ""); + ((PhysicalOperator)opTable.get(root)).visit(lprinter); } public Map<OperatorKey, ExecPhysicalOperator> getOpTable() { Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java Mon Feb 11 15:19:36 2008 @@ -37,9 +37,10 @@ import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; // n-ary, blocking operator. Output has schema: < group_label, { <1>, <2>, ... }, { <a>, <b>, ... } > -class POCogroup extends PhysicalOperator { +public class POCogroup extends PhysicalOperator { /** * */ @@ -160,12 +161,8 @@ } - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POUnion (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType); - + public void visit(POVisitor v) { + v.visitCogroup(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java Mon Feb 11 15:19:36 2008 @@ -26,11 +26,12 @@ import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.eval.collector.DataCollector; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.util.DataBuffer; import org.apache.pig.impl.logicalLayer.OperatorKey; // unary, non-blocking operator. -class POEval extends PhysicalOperator { +public class POEval extends PhysicalOperator { /** * */ @@ -110,13 +111,8 @@ } } - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POEval (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType + ", spec: " + this.spec + - ", inputDrained: " + this.inputDrained + ", lastAdded: " + this.lastAdded); - + public void visit(POVisitor v) { + v.visitEval(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POLoad.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POLoad.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POLoad.java Mon Feb 11 15:19:36 2008 @@ -29,6 +29,7 @@ import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.logicalLayer.OperatorKey; @@ -82,12 +83,8 @@ } @Override - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POLoad (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType + ", filename: " + this.filename + - ", lf: " + this.lf + ", bound: " + this.bound); + public void visit(POVisitor v) { + v.visitLoad(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSort.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSort.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSort.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSort.java Mon Feb 11 15:19:36 2008 @@ -29,6 +29,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.logicalLayer.OperatorKey; @@ -71,12 +72,8 @@ } @Override - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POSort (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType); - + public void visit(POVisitor v) { + v.visitSort(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSplit.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSplit.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POSplit.java Mon Feb 11 15:19:36 2008 @@ -26,9 +26,10 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.eval.cond.Cond; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; import org.apache.pig.impl.logicalLayer.OperatorKey; -class POSplit extends PhysicalOperator { +public class POSplit extends PhysicalOperator { static final long serialVersionUID = 1L; protected ArrayList<Cond> conditions; protected int readFromCond; @@ -73,10 +74,7 @@ return nextTuple; } - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POSplit (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType + ", readFromCond: " + this.readFromCond); + public void visit(POVisitor v) { + v.visitSplit(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POStore.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POStore.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POStore.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POStore.java Mon Feb 11 15:19:36 2008 @@ -32,6 +32,7 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; public class POStore extends PhysicalOperator { @@ -125,12 +126,8 @@ } @Override - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POStore (" + this + "): scope id: " + this.scope + - ", ouutputType: " + this.outputType + ", " + this.f + ", funcSpec: " + - this.funcSpec + ", append: " + this.append); + public void visit(POVisitor v) { + v.visitStore(this); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POUnion.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POUnion.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POUnion.java Mon Feb 11 15:19:36 2008 @@ -26,9 +26,10 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.physicalLayer.PhysicalOperator; +import org.apache.pig.impl.physicalLayer.POVisitor; -class POUnion extends PhysicalOperator { +public class POUnion extends PhysicalOperator { /** * */ @@ -86,11 +87,8 @@ return null; } - public void visit(POVisitor v, String prefix) { - PrintStream ps = v.getPrintStream(); - - ps.println(prefix + "POUnion (" + this + "): scope id: " + this.scope + - ", outputType: " + this.outputType + ", curInput: " + this.currentInput); + public void visit(POVisitor v) { + v.visitUnion(this); } } Added: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java?rev=620665&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POPrinter.java Mon Feb 11 15:19:36 2008 @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.physicalLayer; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Iterator; + +import org.apache.pig.backend.hadoop.executionengine.POMapreduce; +import org.apache.pig.backend.local.executionengine.POCogroup; +import org.apache.pig.backend.local.executionengine.POEval; +import org.apache.pig.backend.local.executionengine.POLoad; +import org.apache.pig.backend.local.executionengine.POSort; +import org.apache.pig.backend.local.executionengine.POSplit; +import org.apache.pig.backend.local.executionengine.POStore; +import org.apache.pig.backend.local.executionengine.POUnion; +import org.apache.pig.backend.executionengine.ExecPhysicalOperator; +import org.apache.pig.impl.eval.EvalSpec; +import org.apache.pig.impl.eval.EvalSpecPrinter; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.logicalLayer.OperatorKey; + + +public class POPrinter extends POVisitor { + + private PrintStream mStream = null; + + public POPrinter(Map<OperatorKey, ExecPhysicalOperator> opTable, + PrintStream ps) { + super(opTable); + mStream = ps; + } + + /** + * Only POMapreduce.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitMapreduce(POMapreduce mr) { + mStream.println("MAPREDUCE"); + printHeader(mr); + + mStream.println("Map: "); + visitSpecs(mr.toMap); + + if (mr.toCombine != null) { + mStream.println("Combine: "); + //visitSpecs(mr.toCombine); + mr.toCombine.visit(new EvalSpecPrinter(mStream)); + } + + if (mr.toReduce != null) { + mStream.println("Reduce: "); + mr.toReduce.visit(new EvalSpecPrinter(mStream)); + } + + if (mr.groupFuncs != null) { + mStream.println("Grouping Funcs: "); + visitSpecs(mr.groupFuncs); + } + + mStream.print("Input Files: "); + Iterator<FileSpec> i = mr.inputFileSpecs.iterator(); + while (i.hasNext()) { + mStream.print(i.next().getFileName()); + if (i.hasNext()) mStream.print(", "); + } + mStream.println(); + + if (mr.outputFileSpec != null) { + mStream.println("Output File: " + mr.outputFileSpec.getFileName()); + } + + if (mr.partitionFunction != null) { + mStream.println("Partition Function: " + + mr.partitionFunction.getName()); + } + + super.visitMapreduce(mr); + } + + /** + * Only POLoad.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitLoad(POLoad load) { + mStream.println("LOAD"); + printHeader(load); + super.visitLoad(load); + } + + /** + * Only POSort.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitSort(POSort s) { + mStream.println("SORT"); + printHeader(s); + super.visitSort(s); + } + + /** + * Only POStore.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitStore(POStore s) { + mStream.println("STORE"); + printHeader(s); + super.visitStore(s); + } + + private void visitSpecs(List<EvalSpec> specs) { + Iterator<EvalSpec> j = specs.iterator(); + while (j.hasNext()) { + j.next().visit(new EvalSpecPrinter(mStream)); + } + } + + private void printHeader(PhysicalOperator po) { + mStream.println("Object id: " + po.hashCode()); + mStream.print("Inputs: "); + for (int i = 0; i < po.inputs.length; i++) { + if (i != 0) mStream.print(", "); + mStream.print(po.inputs[i].hashCode()); + } + mStream.println(); + } + +} + + Added: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java?rev=620665&view=auto ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java (added) +++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POVisitor.java Mon Feb 11 15:19:36 2008 @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.physicalLayer; + +import java.util.Map; + +import org.apache.pig.backend.hadoop.executionengine.POMapreduce; +import org.apache.pig.backend.local.executionengine.POCogroup; +import org.apache.pig.backend.local.executionengine.POEval; +import org.apache.pig.backend.local.executionengine.POLoad; +import org.apache.pig.backend.local.executionengine.POSort; +import org.apache.pig.backend.local.executionengine.POSplit; +import org.apache.pig.backend.local.executionengine.POStore; +import org.apache.pig.backend.local.executionengine.POUnion; +import org.apache.pig.backend.executionengine.ExecPhysicalOperator; +import org.apache.pig.impl.logicalLayer.OperatorKey; + +/** + * A visitor mechanism for navigating and operating on a tree of Physical + * Operators. This class contains the logic to navigate the tree, but does + * not do anything with or to the tree. In order to operate on or extract + * information from the tree, extend this class. You only need to implement + * the methods dealing with the physical operators you are concerned + * with. For example, if you wish to find every POMapreduce in a physical plan + * and perform some operation on it, your visitor would look like: + * class MyPOVisitor extends POVisitor { + * public void visitMapreduce(POMapreduce mr) { you're logic here } + * } + * Any operators that you do not implement the visitX method for will then + * be navigated through by this class. + * + * *NOTE* When envoking a visitor, you should never call one of the + * methods in this class. You should pass your visitor as an argument to + * visit() on the object you want to visit. So: + * RIGHT: POEval myEval; MyVisitor v; myEval.visit(v); + * WRONG: POEval myEval; MyVisitor v; v.visitEval(myEval); + * These methods are only public to make them accessible to the PO* objects. + */ +abstract public class POVisitor { + + protected Map<OperatorKey, ExecPhysicalOperator> mOpTable; + + /** + * @param opTable Operator table for the physical plan. + */ + protected POVisitor(Map<OperatorKey, ExecPhysicalOperator> opTable) { + mOpTable = opTable; + } + + /** + * Only POMapreduce.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitMapreduce(POMapreduce mr) { + basicVisit(mr); + } + + /** + * Only POLoad.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitLoad(POLoad load) { + basicVisit(load); + } + + /** + * Only POSort.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitSort(POSort s) { + basicVisit(s); + } + + /** + * Only POStore.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitStore(POStore s) { + basicVisit(s); + } + + /** + * Only POCogroup.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitCogroup(POCogroup g) { + basicVisit(g); + } + + /** + * Only POEval.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitEval(POEval e) { + basicVisit(e); + } + + /** + * Only POSplit.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitSplit(POSplit s) { + basicVisit(s); + } + + /** + * Only POUnion.visit() and subclass implementations of this function + * should ever call this method. + */ + public void visitUnion(POUnion u) { + basicVisit(u); + } + + private void basicVisit(PhysicalOperator po) { + for (int i = 0; i < po.inputs.length; i++) { + ((PhysicalOperator)mOpTable.get(po.inputs[i])).visit(this); + } + } + +} + + Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=620665&r1=620664&r2=620665&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Mon Feb 11 15:19:36 2008 @@ -26,7 +26,7 @@ import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.util.LineageTracer; import org.apache.pig.backend.executionengine.ExecPhysicalOperator; -import org.apache.pig.backend.local.executionengine.POVisitor; +import org.apache.pig.impl.physicalLayer.POVisitor; public abstract class PhysicalOperator implements Serializable, ExecPhysicalOperator { @@ -89,5 +89,5 @@ return outputType; } - public abstract void visit(POVisitor v, String prfix); + public abstract void visit(POVisitor v); }