Author: gates Date: Thu Feb 28 08:18:43 2008 New Revision: 632041 URL: http://svn.apache.org/viewvc?rev=632041&view=rev Log: Added generic OperatorPlan and associated classes. These will be used by logical and physical plans.
Added: incubator/pig/branches/types/lib/commons-collections-3.2.jar (with props) incubator/pig/branches/types/src/org/apache/pig/impl/plan/ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Modified: incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=632041&r1=632040&r2=632041&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Thu Feb 28 08:18:43 2008 @@ -72,6 +72,7 @@ <fileset file="${lib.dir}/javacc.jar" /> <fileset file="${lib.dir}/jsch-0.1.33.jar" /> <fileset file="${lib.dir}/junit-4.1.jar" /> + <fileset file="${lib.dir}/commons-collections-3.2.jar" /> </path> <path id="test.classpath"> Added: incubator/pig/branches/types/lib/commons-collections-3.2.jar URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/commons-collections-3.2.jar?rev=632041&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/branches/types/lib/commons-collections-3.2.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java?rev=632041&r1=632040&r2=632041&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java Thu Feb 28 08:18:43 2008 @@ -16,6 +16,8 @@ * limitations under the License. */ + // TODO Need to move this into the plan package. + package org.apache.pig.impl.logicalLayer; import java.io.Serializable; Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java?rev=632041&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java Thu Feb 28 08:18:43 2008 @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.plan; + +import java.util.ArrayList; +import java.util.List; +import java.lang.StringBuilder; + +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.logicalLayer.parser.ParseException; + + +/** + * Base class for all types of operators. + */ +abstract public class Operator { + private static final long serialVersionUID = 1L; + + /** + * OperatorKey associated with this operator. This key is used to find the + * operator in an OperatorPlan. + */ + protected OperatorKey mKey; + + /** + * @param - k Operator key to assign to this node. + */ + public Operator(OperatorKey k) { + mKey = k; + } + + /** + * Get the operator key for this operator. + */ + public OperatorKey getOperatorKey() { + return mKey; + } + + /** + * Visit this node with the provided visitor. This should only be called by + * the visitor class itself, never directly. + * @param v Visitor to visit with. + * @throws ParseException if the visitor has a problem. + */ + public abstract void visit(PlanVisitor v) throws ParseException; + + /** + * Indicates whether this operator supports multiple inputs. + * @return true if it does, otherwise false. + */ + public abstract boolean supportsMultipleInputs(); + + /** + * Indicates whether this operator supports multiple outputs. + * @return true if it does, otherwise false. + */ + public abstract boolean supportsMultipleOutputs(); + + public abstract String name(); + + public abstract String typeName(); + + @Override + public String toString() { + StringBuilder msg = new StringBuilder(); + + msg.append("(Name: " + name() + " Operator Key: " + mKey + ")"); + + return msg.toString(); + } +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=632041&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Feb 28 08:18:43 2008 @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.plan; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.pig.impl.logicalLayer.OperatorKey; + +import org.apache.commons.collections.map.MultiValueMap; + +/** + * A generic graphing class for use by LogicalPlan, PhysicalPlan, etc. + */ +public abstract class OperatorPlan implements Iterable { + protected Map<Operator, OperatorKey> mOps; + protected Map<OperatorKey, Operator> mKeys; + protected MultiValueMap mFromEdges; + protected MultiValueMap mToEdges; + + private List<Operator> mRoots; + private List<Operator> mLeaves; + + + public OperatorPlan() { + mRoots = new ArrayList<Operator>(); + mLeaves = new ArrayList<Operator>(); + mOps = new HashMap<Operator, OperatorKey>(); + mKeys = new HashMap<OperatorKey, Operator>(); + mFromEdges = new MultiValueMap(); + mToEdges = new MultiValueMap(); + } + + /** + * Get a list of all nodes in the graph that are roots. A root is defined to + * be a node that has no input. + */ + public List<Operator> getRoots() { + if (mRoots.size() == 0 && mOps.size() > 0) { + for (Operator op : mOps.keySet()) { + if (mToEdges.getCollection(op) == null) { + mRoots.add(op); + } + } + } + return mRoots; + } + + /** + * Get a list of all nodes in the graph that are leaves. A leaf is defined to + * be a node that has no output. + */ + public List<Operator> getLeaves() { + if (mLeaves.size() == 0 && mOps.size() > 0) { + for (Operator op : mOps.keySet()) { + if (mFromEdges.getCollection(op) == null) { + mLeaves.add(op); + } + } + } + return mLeaves; + } + + /** + * Given an operator, find its OperatorKey. + * @param op Logical operator. + * @return associated OperatorKey + */ + public OperatorKey getOperatorKey(Operator op) { + return mOps.get(op); + } + + /** + * Given an OperatorKey, find the associated operator. + * @param opKey OperatorKey + * @return associated operator. + */ + public Operator getOperator(OperatorKey opKey) { + return mKeys.get(opKey); + } + + /** + * Insert an operator into the plan. This only inserts it as a node in + * the graph, it does not connect it to any other operators. That should + * be done as a separate step using connect. + * @param op Operator to add to the plan. + */ + public void add(Operator op) { + markDirty(); + mOps.put(op, op.getOperatorKey()); + mKeys.put(op.getOperatorKey(), op); + } + + /** + * Create an edge between two nodes. The direction of the edge implies data + * flow. + * @param from Operator data will flow from. + * @param to Operator data will flow to. + * @throws IOException if this edge will create multiple inputs for an + * operator that does not support multiple inputs or create multiple outputs + * for an operator that does not support multiple outputs. + */ + public void connect(Operator from, Operator to) throws IOException { + markDirty(); + + // Check that both nodes are in the plan. + checkInPlan(from); + checkInPlan(to); + + // Check to see if the from operator already has outputs, and if so + // whether it supports multiple outputs. + if (mFromEdges.getCollection(from) != null && + !from.supportsMultipleOutputs()) { + throw new IOException("Attempt to give operator of type " + + from.typeName() + " multiple outputs. This operator does " + + "not support multiple outputs."); + } + + // Check to see if the to operator already has inputs, and if so + // whether it supports multiple inputs. + if (mToEdges.getCollection(to) != null && + !to.supportsMultipleInputs()) { + throw new IOException("Attempt to give operator of type " + + from.typeName() + " multiple inputs. This operator does " + + "not support multiple inputs."); + } + + mFromEdges.put(from, to); + mToEdges.put(to, from); + } + + /** + * Remove an edge from between two nodes. + * @param from Operator data would flow from. + * @param to Operator data would flow to. + * @return true if the nodes were connected according to the specified data + * flow, false otherwise. + */ + public boolean disconnect(Operator from, Operator to) { + markDirty(); + + boolean sawNull = false; + if (mFromEdges.remove(from, to) == null) sawNull = true; + if (mToEdges.remove(to, from) == null) sawNull = true; + + return !sawNull; + } + + /** + * Remove an operator from the plan. Any edges that the node has will + * be removed as well. + * @param op Operator to remove. + */ + public void remove(Operator op) { + markDirty(); + + removeEdges(op, mFromEdges, mToEdges); + removeEdges(op, mToEdges, mFromEdges); + + // Remove the operator from nodes + mOps.remove(op); + mKeys.remove(op.getOperatorKey()); + } + + /** + * Find all of the nodes that have edges to the indicated node from + * themselves. + * @param op Node to look to + * @return Collection of nodes. + */ + public Collection<Operator> getPredecessors(Operator op) { + return (Collection<Operator>)mToEdges.getCollection(op); + } + + + /** + * Find all of the nodes that have edges from the indicated node to + * themselves. + * @param op Node to look from + * @return Collection of nodes. + */ + public Collection<Operator> getSuccessors(Operator op) { + return (Collection<Operator>)mFromEdges.getCollection(op); + } + + public Iterator<Operator> iterator() { + return mOps.keySet().iterator(); + } + + private void markDirty() { + mRoots.clear(); + mLeaves.clear(); + } + + private void removeEdges(Operator op, + MultiValueMap fromMap, + MultiValueMap toMap) { + // Find all of the from edges, as I have to remove all the associated to + // edges. Need to make a copy so we can delete from the map without + // screwing up our iterator. + Collection c = fromMap.getCollection(op); + if (c == null) return; + + ArrayList al = new ArrayList(c); + Iterator i = al.iterator(); + while (i.hasNext()) { + Operator to = (Operator)i.next(); + toMap.remove(to, op); + fromMap.remove(op, to); + } + } + + private void checkInPlan(Operator op) throws IOException { + if (mOps.get(op) == null) { + throw new IOException("Attempt to connect operator " + + op.name() + " which is not in the plan."); + } + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java?rev=632041&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java Thu Feb 28 08:18:43 2008 @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.pig.impl.logicalLayer.parser.ParseException; + +/** + * A visitor mechanism for navigating and operating on a plan of + * Operators. This class contains the logic to traverse the plan. It does not + * visit individual nodes. That is left to implementing classes (such as + * LOVisitor). + * + */ +abstract public class PlanVisitor { + + protected OperatorPlan mPlan; + + /** + * Entry point for visiting the plan. + * @throws ParseException if an error is encountered while visiting. + */ + public abstract void visit() throws ParseException; + + /** + * @param plan OperatorPlan this visitor will visit. + */ + protected PlanVisitor(OperatorPlan plan) { + mPlan = plan; + } + + /** + * Visit the graph in a depth first traversal. + * @throws ParseException if the underlying visitor has a problem. + */ + protected void depthFirst() throws ParseException { + List<Operator> roots = mPlan.getRoots(); + Set<Operator> seen = new HashSet<Operator>(); + + depthFirst(null, roots, seen); + } + + private void depthFirst(Operator node, + Collection<Operator> successors, + Set<Operator> seen) throws ParseException { + if (successors == null) return; + + for (Operator suc : successors) { + if (seen.add(suc)) { + suc.visit(this); + Collection<Operator> newSuccessors = mPlan.getSuccessors(suc); + depthFirst(suc, newSuccessors, seen); + } + } + } + + /** + * Visit the graph in a way that guarantees that no node is visited before + * all the nodes it depends on (that is, all those giving it input) have + * already been visited. + * @throws ParseException if the underlying visitor has a problem. + */ + protected void dependencyOrder() throws ParseException { + // This is highly inneficient, but our graphs are small so it should be okay. + // The algorithm works by starting at any node in the graph, finding it's + // predecessors and calling itself for each of those predecessors. When it + // finds a node that has no unfinished predecessors it puts that node in the + // list. It then unwinds itself putting each of the other nodes in the list. + // It keeps track of what nodes it's seen as it goes so it doesn't put any + // nodes in the graph twice. + + List<Operator> fifo = new ArrayList<Operator>(); + Set<Operator> seen = new HashSet<Operator>(); + List<Operator> leaves = mPlan.getLeaves(); + if (leaves == null) return; + for (Operator op : leaves) { + doAllPredecessors(op, seen, fifo); + } + + for (Operator op: fifo) { + op.visit(this); + } + } + + private void doAllPredecessors(Operator node, + Set<Operator> seen, + Collection<Operator> fifo) throws ParseException { + if (!seen.contains(node)) { + // We haven't seen this one before. + Collection<Operator> preds = mPlan.getPredecessors(node); + if (preds != null && preds.size() > 0) { + // Do all our predecessors before ourself + for (Operator op : preds) { + doAllPredecessors(op, seen, fifo); + } + } + // Now do ourself + seen.add(node); + fifo.add(node); + } + } +} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=632041&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Thu Feb 28 08:18:43 2008 @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.plan.*; + +import org.junit.Test; + +/** + * Test the generic operator classes (Operator, OperatorPlan, + * PlanVisitor). + */ + +public class TestOperatorPlan extends junit.framework.TestCase { + + private int mNextKey = 0; + + abstract class TOperator extends Operator { + TOperator(OperatorKey key) { + super(key); + } + } + + class SingleOperator extends TOperator { + private String mName; + + SingleOperator(String name) { + super(new OperatorKey("", mNextKey++)); + mName = name; + } + + public boolean supportsMultipleInputs() { + return false; + } + + public boolean supportsMultipleOutputs() { + return false; + } + + @Override + public void visit(PlanVisitor v) throws ParseException { + ((TVisitor)v).visitSingleOperator(this); + } + + public String name() { + return mName; + } + + public String typeName() { + return "Single"; + } + } + + class MultiOperator extends TOperator { + private String mName; + + MultiOperator(String name) { + super(new OperatorKey("", mNextKey++)); + mName = name; + } + + public boolean supportsMultipleInputs() { + return true; + } + + public boolean supportsMultipleOutputs() { + return true; + } + + public void visit(PlanVisitor v) throws ParseException { + ((TVisitor)v).visitMultiOperator(this); + } + + public String name() { + return mName; + } + + public String typeName() { + return "Multi"; + } + } + + class TPlan extends OperatorPlan { + + public String display() { + StringBuilder buf = new StringBuilder(); + + buf.append("Nodes: "); + for (Operator op : mOps.keySet()) { + buf.append(op.name()); + buf.append(' '); + } + + buf.append("FromEdges: "); + Iterator i = mFromEdges.keySet().iterator(); + while (i.hasNext()) { + Operator from = (Operator)i.next(); + Iterator j = mFromEdges.iterator(from); + while (j.hasNext()) { + buf.append(from.name()); + buf.append("->"); + buf.append(((Operator)j.next()).name()); + buf.append(' '); + } + } + + buf.append("ToEdges: "); + i = mToEdges.keySet().iterator(); + while (i.hasNext()) { + Operator from = (Operator)i.next(); + Iterator j = mToEdges.iterator(from); + while (j.hasNext()) { + buf.append(from.name()); + buf.append("->"); + buf.append(((Operator)j.next()).name()); + buf.append(' '); + } + } + return buf.toString(); + } + } + + abstract class TVisitor extends PlanVisitor { + protected StringBuilder mJournal; + + TVisitor(TPlan plan) { + super(plan); + mJournal = new StringBuilder(); + } + + public void visitSingleOperator(SingleOperator so) throws ParseException { + mJournal.append(so.name()); + mJournal.append(' '); + } + + public void visitMultiOperator(MultiOperator mo) throws ParseException { + mJournal.append(mo.name()); + mJournal.append(' '); + } + + public String getJournal() { + return mJournal.toString(); + } + } + + class TDepthVisitor extends TVisitor { + + TDepthVisitor(TPlan plan) { + super(plan); + } + + public void visit() throws ParseException { + depthFirst(); + } + } + + class TDependVisitor extends TVisitor { + + TDependVisitor(TPlan plan) { + super(plan); + } + + public void visit() throws ParseException { + dependencyOrder(); + } + } + + @Test + public void testAddRemove() throws Exception { + // Test that we can add and remove nodes from the plan. Also test + // that we can fetch the nodes by operator key, by operator, by + // roots, by leaves, that they have no predecessors and no + // successors. + + TPlan plan = new TPlan(); + Operator[] ops = new Operator[3]; + for (int i = 0; i < 3; i++) { + ops[i] = new SingleOperator(Integer.toString(i)); + plan.add(ops[i]); + } + + // All should be roots, as none are connected + List<Operator> roots = plan.getRoots(); + for (int i = 0; i < 3; i++) { + assertTrue("Roots should contain operator " + i, + roots.contains(ops[i])); + } + + // All should be leaves, as none are connected + List<Operator> leaves = plan.getLeaves(); + for (int i = 0; i < 3; i++) { + assertTrue("Leaves should contain operator " + i, + leaves.contains(ops[i])); + } + + // Each operator should have no successors or predecessors. + assertNull(plan.getSuccessors(ops[1])); + assertNull(plan.getPredecessors(ops[1])); + + // Make sure we find them all when we iterate through them. + Set<Operator> s = new HashSet<Operator>(); + Iterator<Operator> j = plan.iterator(); + while (j.hasNext()) { + s.add(j.next()); + } + + for (int i = 0; i < 3; i++) { + assertTrue("Iterator should contain operator " + i, + s.contains(ops[i])); + } + + // Test that we can find an operator by its key. + Operator op = plan.getOperator(new OperatorKey("", 1)); + assertEquals("Expected to get back ops[1]", ops[1], op); + + // Test that we can get an operator key by its operator + OperatorKey opkey = new OperatorKey("", 1); + assertTrue("Expected to get back key for ops[1]", + opkey.equals(plan.getOperatorKey(ops[1]))); + + // Test that we can remove operators + plan.remove(ops[2]); + + assertEquals("Should only have two roots now.", 2, + plan.getRoots().size()); + assertEquals("Should only have two leaves now.", 2, + plan.getLeaves().size()); + + j = plan.iterator(); + int k; + for (k = 0; j.hasNext(); k++) j.next(); + assertEquals("Iterator should only return two now", 2, k); + + // Remove all operators + plan.remove(ops[0]); + plan.remove(ops[1]); + + assertEquals("Should only have no roots now.", 0, + plan.getRoots().size()); + assertEquals("Should only have no leaves now.", 0, + plan.getLeaves().size()); + + j = plan.iterator(); + assertFalse("Iterator should return nothing now", j.hasNext()); + } + + @Test + public void testLinearGraph() throws Exception { + TPlan plan = new TPlan(); + Operator[] ops = new Operator[5]; + for (int i = 0; i < 5; i++) { + ops[i] = new SingleOperator(Integer.toString(i)); + plan.add(ops[i]); + if (i > 0) plan.connect(ops[i - 1], ops[i]); + } + + // Test that connecting a node not yet in the plan is detected. + Operator bogus = new SingleOperator("X"); + boolean sawError = false; + try { + plan.connect(ops[2], bogus); + } catch (IOException ioe) { + assertEquals("Attempt to connect operator X which is not in " + + "the plan.", ioe.getMessage()); + sawError = true; + } + assertTrue("Should have caught an error when we tried to connect a " + + "node that was not in the plan", sawError); + + // Get roots should just return ops[0] + List<Operator> roots = plan.getRoots(); + assertEquals(1, roots.size()); + assertEquals(roots.get(0), ops[0]); + + // Get leaves should just return ops[4] + List<Operator> leaves = plan.getLeaves(); + assertEquals(1, leaves.size()); + assertEquals(leaves.get(0), ops[4]); + + // Test that connecting another input to SingleOperator gives + // error. + plan.add(bogus); + sawError = false; + try { + plan.connect(bogus, ops[1]); + } catch (IOException ioe) { + assertEquals("Attempt to give operator of type Single " + + "multiple inputs. This operator does " + + "not support multiple inputs.", ioe.getMessage()); + sawError = true; + } + assertTrue("Should have caught an error when we tried to connect a " + + "second input to a Single", sawError); + + // Test that connecting another output to SingleOperator gives + // error. + sawError = false; + try { + plan.connect(ops[0], bogus); + } catch (IOException ioe) { + assertEquals("Attempt to give operator of type Single " + + "multiple outputs. This operator does " + + "not support multiple outputs.", ioe.getMessage()); + sawError = true; + } + assertTrue("Should have caught an error when we tried to connect a " + + "second output to a Single", sawError); + plan.remove(bogus); + + // Successor for ops[1] should be ops[2] + Collection s = plan.getSuccessors(ops[1]); + assertEquals(1, s.size()); + Iterator i = s.iterator(); + assertEquals(ops[2], i.next()); + + // Predecessor for ops[1] should be ops[0] + Collection p = plan.getPredecessors(ops[1]); + assertEquals(1, p.size()); + i = p.iterator(); + assertEquals(ops[0], i.next()); + + assertEquals("Nodes: 2 1 4 0 3 FromEdges: 2->3 1->2 0->1 3->4 ToEdges: 2->1 1->0 4->3 3->2 ", plan.display()); + + // Visit it depth first + TVisitor visitor = new TDepthVisitor(plan); + visitor.visit(); + assertEquals("0 1 2 3 4 ", visitor.getJournal()); + + // Visit it dependency order + visitor = new TDependVisitor(plan); + visitor.visit(); + assertEquals("0 1 2 3 4 ", visitor.getJournal()); + + // Test disconnect + plan.disconnect(ops[2], ops[3]); + assertEquals("Nodes: 2 1 4 0 3 FromEdges: 1->2 0->1 3->4 ToEdges: 2->1 1->0 4->3 ", plan.display()); + + // Test remove + plan.remove(ops[1]); + assertEquals("Nodes: 2 4 0 3 FromEdges: 3->4 ToEdges: 4->3 ", plan.display()); + } + + @Test + public void testDAG() throws Exception { + TPlan plan = new TPlan(); + Operator[] ops = new Operator[6]; + for (int i = 0; i < 6; i++) { + ops[i] = new MultiOperator(Integer.toString(i)); + plan.add(ops[i]); + } + plan.connect(ops[0], ops[2]); + plan.connect(ops[1], ops[2]); + plan.connect(ops[2], ops[3]); + plan.connect(ops[3], ops[4]); + plan.connect(ops[3], ops[5]); + + // Get roots should return ops[0] and ops[1] + List<Operator> roots = plan.getRoots(); + assertEquals(2, roots.size()); + assertTrue(roots.contains(ops[0])); + assertTrue(roots.contains(ops[1])); + + // Get leaves should return ops[4] and ops[5] + List<Operator> leaves = plan.getLeaves(); + assertEquals(2, leaves.size()); + assertTrue(leaves.contains(ops[4])); + assertTrue(leaves.contains(ops[5])); + + // Successor for ops[3] should be ops[4] and ops[5] + List<Operator> s = new ArrayList<Operator>(plan.getSuccessors(ops[3])); + assertEquals(2, s.size()); + assertTrue(s.contains(ops[4])); + assertTrue(s.contains(ops[5])); + + // Predecessor for ops[2] should be ops[0] and ops[1] + s = new ArrayList<Operator>(plan.getPredecessors(ops[2])); + assertEquals(2, s.size()); + assertTrue(s.contains(ops[0])); + assertTrue(s.contains(ops[1])); + + assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 2->3 ToEdges: 3->2 5->3 4->3 2->0 2->1 ", plan.display()); + + // Visit it depth first + TVisitor visitor = new TDepthVisitor(plan); + visitor.visit(); + assertEquals("0 2 3 4 5 1 ", visitor.getJournal()); + + // Visit it dependency order + visitor = new TDependVisitor(plan); + visitor.visit(); + assertEquals("0 1 2 3 5 4 ", visitor.getJournal()); + + // Test disconnect + plan.disconnect(ops[2], ops[3]); + assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 ToEdges: 5->3 4->3 2->0 2->1 ", plan.display()); + + // Test remove + plan.remove(ops[2]); + assertEquals("Nodes: 0 3 5 1 4 FromEdges: 3->4 3->5 ToEdges: 5->3 4->3 ", plan.display()); + } + + +} +