Author: gates
Date: Thu Feb 28 08:18:43 2008
New Revision: 632041

URL: http://svn.apache.org/viewvc?rev=632041&view=rev
Log:
Added generic OperatorPlan and associated classes.  These will be used by
logical and physical plans.


Added:
    incubator/pig/branches/types/lib/commons-collections-3.2.jar   (with props)
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
Modified:
    incubator/pig/branches/types/build.xml
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java

Modified: incubator/pig/branches/types/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=632041&r1=632040&r2=632041&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Feb 28 08:18:43 2008
@@ -72,6 +72,7 @@
         <fileset file="${lib.dir}/javacc.jar" />
         <fileset file="${lib.dir}/jsch-0.1.33.jar" />
         <fileset file="${lib.dir}/junit-4.1.jar" />
+        <fileset file="${lib.dir}/commons-collections-3.2.jar" />
     </path>
     
     <path id="test.classpath">

Added: incubator/pig/branches/types/lib/commons-collections-3.2.jar
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/commons-collections-3.2.jar?rev=632041&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/branches/types/lib/commons-collections-3.2.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java?rev=632041&r1=632040&r2=632041&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
 Thu Feb 28 08:18:43 2008
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+ // TODO Need to move this into the plan package.
+
 package org.apache.pig.impl.logicalLayer;
 
 import java.io.Serializable;

Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java?rev=632041&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java 
(added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java Thu 
Feb 28 08:18:43 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.lang.StringBuilder;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
+
+/**
+ * Base class for all types of operators.
+ */
+abstract public class Operator {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * OperatorKey associated with this operator.  This key is used to find the
+     * operator in an OperatorPlan.
+     */
+    protected OperatorKey mKey;
+
+    /**
+     * @param - k Operator key to assign to this node.
+     */
+    public Operator(OperatorKey k) {
+        mKey = k;
+    }
+
+    /**
+     * Get the operator key for this operator.
+     */
+    public OperatorKey getOperatorKey() {
+        return mKey;
+    }
+
+    /**
+     * Visit this node with the provided visitor.  This should only be called 
by
+     * the visitor class itself, never directly.
+     * @param v Visitor to visit with.
+     * @throws ParseException if the visitor has a problem.
+     */
+    public abstract void visit(PlanVisitor v) throws ParseException;
+
+    /**
+     * Indicates whether this operator supports multiple inputs.
+     * @return true if it does, otherwise false.
+     */
+    public abstract boolean supportsMultipleInputs();
+
+    /**
+     * Indicates whether this operator supports multiple outputs.
+     * @return true if it does, otherwise false.
+     */
+    public abstract boolean supportsMultipleOutputs();
+    
+    public abstract String name();
+
+    public abstract String typeName();
+    
+    @Override
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+        
+        msg.append("(Name: " + name() + " Operator Key: " + mKey + ")");
+
+        return msg.toString();
+    }
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=632041&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java 
(added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java 
Thu Feb 28 08:18:43 2008
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+import org.apache.commons.collections.map.MultiValueMap;
+
+/**
+ * A generic graphing class for use by LogicalPlan, PhysicalPlan, etc.
+ */
+public abstract class OperatorPlan implements Iterable {
+    protected Map<Operator, OperatorKey> mOps;
+    protected Map<OperatorKey, Operator> mKeys;
+    protected MultiValueMap mFromEdges;
+    protected MultiValueMap mToEdges;
+
+    private List<Operator> mRoots;
+    private List<Operator> mLeaves;
+
+    
+    public OperatorPlan() {
+        mRoots = new ArrayList<Operator>();
+        mLeaves = new ArrayList<Operator>();
+        mOps = new HashMap<Operator, OperatorKey>();
+        mKeys = new HashMap<OperatorKey, Operator>();
+        mFromEdges = new MultiValueMap();
+        mToEdges = new MultiValueMap();
+    }
+
+    /**
+     * Get a list of all nodes in the graph that are roots.  A root is defined 
to
+     * be a node that has no input.
+     */
+    public List<Operator> getRoots() {
+        if (mRoots.size() == 0 && mOps.size() > 0) {
+            for (Operator op : mOps.keySet()) {
+                if (mToEdges.getCollection(op) == null) {
+                    mRoots.add(op);
+                }
+            }
+        }
+        return mRoots;
+    }
+
+    /**
+     * Get a list of all nodes in the graph that are leaves.  A leaf is 
defined to
+     * be a node that has no output.
+     */
+    public List<Operator> getLeaves() {
+        if (mLeaves.size() == 0 && mOps.size() > 0) {
+            for (Operator op : mOps.keySet()) {
+                if (mFromEdges.getCollection(op) == null) {
+                    mLeaves.add(op);
+                }
+            }
+        }
+        return mLeaves;
+    }
+
+    /**
+     * Given an operator, find its OperatorKey.
+     * @param op Logical operator.
+     * @return associated OperatorKey
+     */
+    public OperatorKey getOperatorKey(Operator op) {
+        return mOps.get(op);
+    }
+
+    /**
+     * Given an OperatorKey, find the associated operator.
+     * @param opKey OperatorKey
+     * @return associated operator.
+     */
+    public Operator getOperator(OperatorKey opKey) {
+        return mKeys.get(opKey);
+    }
+
+    /**
+     * Insert an operator into the plan.  This only inserts it as a node in
+     * the graph, it does not connect it to any other operators.  That should
+     * be done as a separate step using connect.
+     * @param op Operator to add to the plan.
+     */
+    public void add(Operator op) {
+        markDirty();
+        mOps.put(op, op.getOperatorKey());
+        mKeys.put(op.getOperatorKey(), op);
+    }
+
+    /**
+     * Create an edge between two nodes.  The direction of the edge implies 
data
+     * flow.
+     * @param from Operator data will flow from.
+     * @param to Operator data will flow to.
+     * @throws IOException if this edge will create multiple inputs for an
+     * operator that does not support multiple inputs or create multiple 
outputs
+     * for an operator that does not support multiple outputs.
+     */
+    public void connect(Operator from, Operator to) throws IOException {
+        markDirty();
+
+        // Check that both nodes are in the plan.
+        checkInPlan(from);
+        checkInPlan(to);
+
+        // Check to see if the from operator already has outputs, and if so
+        // whether it supports multiple outputs.
+        if (mFromEdges.getCollection(from) != null &&
+                !from.supportsMultipleOutputs()) {
+            throw new IOException("Attempt to give operator of type " +
+                from.typeName() + " multiple outputs.  This operator does "
+                + "not support multiple outputs.");
+        }
+
+        // Check to see if the to operator already has inputs, and if so
+        // whether it supports multiple inputs.
+        if (mToEdges.getCollection(to) != null &&
+                !to.supportsMultipleInputs()) {
+            throw new IOException("Attempt to give operator of type " +
+                from.typeName() + " multiple inputs.  This operator does "
+                + "not support multiple inputs.");
+        }
+
+        mFromEdges.put(from, to);
+        mToEdges.put(to, from);
+    }
+
+    /**
+     * Remove an edge from between two nodes.
+     * @param from Operator data would flow from.
+     * @param to Operator data would flow to.
+     * @return true if the nodes were connected according to the specified data
+     * flow, false otherwise.
+     */
+    public boolean disconnect(Operator from, Operator to) {
+        markDirty();
+
+        boolean sawNull = false;
+        if (mFromEdges.remove(from, to) == null) sawNull = true;
+        if (mToEdges.remove(to, from) == null) sawNull = true;
+
+        return !sawNull;
+    }
+
+    /**
+     * Remove an operator from the plan.  Any edges that the node has will
+     * be removed as well.
+     * @param op Operator to remove.
+     */
+    public void remove(Operator op) {
+        markDirty();
+
+        removeEdges(op, mFromEdges, mToEdges);
+        removeEdges(op, mToEdges, mFromEdges);
+
+        // Remove the operator from nodes
+        mOps.remove(op);
+        mKeys.remove(op.getOperatorKey());
+    }
+
+    /**
+     * Find all of the nodes that have edges to the indicated node from
+     * themselves.
+     * @param op Node to look to
+     * @return Collection of nodes.
+     */
+    public Collection<Operator> getPredecessors(Operator op) {
+        return (Collection<Operator>)mToEdges.getCollection(op);
+    }
+
+
+    /**
+     * Find all of the nodes that have edges from the indicated node to
+     * themselves.
+     * @param op Node to look from
+     * @return Collection of nodes.
+     */
+    public Collection<Operator> getSuccessors(Operator op) {
+        return (Collection<Operator>)mFromEdges.getCollection(op);
+    }
+
+    public Iterator<Operator> iterator() { 
+        return mOps.keySet().iterator();
+    }
+
+    private void markDirty() {
+        mRoots.clear();
+        mLeaves.clear();
+    }
+
+    private void removeEdges(Operator op,
+                             MultiValueMap fromMap,
+                             MultiValueMap toMap) {
+        // Find all of the from edges, as I have to remove all the associated 
to
+        // edges.  Need to make a copy so we can delete from the map without
+        // screwing up our iterator.
+        Collection c = fromMap.getCollection(op);
+        if (c == null) return;
+
+        ArrayList al = new ArrayList(c);
+        Iterator i = al.iterator();
+        while (i.hasNext()) {
+            Operator to = (Operator)i.next();
+            toMap.remove(to, op);
+            fromMap.remove(op, to);
+        }
+    }
+
+    private void checkInPlan(Operator op) throws IOException {
+        if (mOps.get(op) == null) {
+            throw new IOException("Attempt to connect operator " +
+                op.name() + " which is not in the plan.");
+        }
+    }
+
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java?rev=632041&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java 
(added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java 
Thu Feb 28 08:18:43 2008
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
+/**
+ * A visitor mechanism for navigating and operating on a plan of 
+ * Operators.  This class contains the logic to traverse the plan.  It does not
+ * visit individual nodes.  That is left to implementing classes (such as
+ * LOVisitor).
+ *
+ */
+abstract public class PlanVisitor {
+
+    protected OperatorPlan mPlan;
+
+    /**
+     * Entry point for visiting the plan.
+     * @throws ParseException if an error is encountered while visiting.
+     */
+    public abstract void visit() throws ParseException;
+
+    /**
+     * @param plan OperatorPlan this visitor will visit.
+     */
+    protected PlanVisitor(OperatorPlan plan) {
+        mPlan = plan;
+    }
+
+    /**
+     * Visit the graph in a depth first traversal.
+     * @throws ParseException if the underlying visitor has a problem.
+     */
+    protected void depthFirst() throws ParseException {
+        List<Operator> roots = mPlan.getRoots();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        depthFirst(null, roots, seen);
+    }
+
+    private void depthFirst(Operator node,
+                            Collection<Operator> successors,
+                            Set<Operator> seen) throws ParseException {
+        if (successors == null) return;
+
+        for (Operator suc : successors) {
+            if (seen.add(suc)) {
+                suc.visit(this);
+                Collection<Operator> newSuccessors = mPlan.getSuccessors(suc);
+                depthFirst(suc, newSuccessors, seen);
+            }
+        }
+    }
+
+    /**
+     * Visit the graph in a way that guarantees that no node is visited before
+     * all the nodes it depends on (that is, all those giving it input) have
+     * already been visited.
+     * @throws ParseException if the underlying visitor has a problem.
+     */
+    protected void dependencyOrder() throws ParseException {
+        // This is highly inneficient, but our graphs are small so it should 
be okay.
+        // The algorithm works by starting at any node in the graph, finding 
it's
+        // predecessors and calling itself for each of those predecessors.  
When it
+        // finds a node that has no unfinished predecessors it puts that node 
in the
+        // list.  It then unwinds itself putting each of the other nodes in 
the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put 
any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> leaves = mPlan.getLeaves();
+        if (leaves == null) return;
+        for (Operator op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.visit(this);
+        }
+    }
+
+    private void doAllPredecessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws 
ParseException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<Operator> preds = mPlan.getPredecessors(node);
+            if (preds != null && preds.size() > 0) {
+                // Do all our predecessors before ourself
+                for (Operator op : preds) {
+                    doAllPredecessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=632041&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
Thu Feb 28 08:18:43 2008
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.plan.*;
+
+import org.junit.Test;
+
+/**
+ * Test the generic operator classes (Operator, OperatorPlan,
+ * PlanVisitor).
+ */
+
+public class TestOperatorPlan extends junit.framework.TestCase {
+
+    private int mNextKey = 0;
+
+    abstract class TOperator extends Operator {
+        TOperator(OperatorKey key) {
+            super(key);
+        }
+    }
+
+    class SingleOperator extends TOperator {
+        private String mName;
+
+        SingleOperator(String name) {
+            super(new OperatorKey("", mNextKey++));
+            mName = name;
+        }
+
+        public boolean supportsMultipleInputs() {
+            return false;
+        }
+
+        public boolean supportsMultipleOutputs() {
+            return false;
+        }
+
+        @Override
+        public void visit(PlanVisitor v) throws ParseException {
+            ((TVisitor)v).visitSingleOperator(this);
+        }
+
+        public String name() {
+            return mName;
+        }
+
+        public String typeName() {
+            return "Single";
+        }
+    }
+
+    class MultiOperator extends TOperator {
+        private String mName;
+
+        MultiOperator(String name) {
+            super(new OperatorKey("", mNextKey++));
+            mName = name;
+        }
+
+        public boolean supportsMultipleInputs() {
+            return true;
+        }
+
+        public boolean supportsMultipleOutputs() {
+            return true;
+        }
+
+        public void visit(PlanVisitor v) throws ParseException {
+            ((TVisitor)v).visitMultiOperator(this);
+        }
+
+        public String name() {
+            return mName;
+        }
+
+        public String typeName() {
+            return "Multi";
+        }
+    }
+
+    class TPlan extends OperatorPlan {
+
+        public String display() {
+            StringBuilder buf = new StringBuilder();
+
+            buf.append("Nodes: ");
+            for (Operator op : mOps.keySet()) {
+                buf.append(op.name());
+                buf.append(' ');
+            }
+
+            buf.append("FromEdges: ");
+            Iterator i = mFromEdges.keySet().iterator();
+            while (i.hasNext()) {
+                Operator from = (Operator)i.next();
+                Iterator j = mFromEdges.iterator(from);
+                while (j.hasNext()) {
+                    buf.append(from.name());
+                    buf.append("->");
+                    buf.append(((Operator)j.next()).name());
+                    buf.append(' ');
+                }
+            }
+
+            buf.append("ToEdges: ");
+            i = mToEdges.keySet().iterator();
+            while (i.hasNext()) {
+                Operator from = (Operator)i.next();
+                Iterator j = mToEdges.iterator(from);
+                while (j.hasNext()) {
+                    buf.append(from.name());
+                    buf.append("->");
+                    buf.append(((Operator)j.next()).name());
+                    buf.append(' ');
+                }
+            }
+            return buf.toString();
+        }
+    }
+
+    abstract class TVisitor extends PlanVisitor {
+        protected StringBuilder mJournal;
+
+        TVisitor(TPlan plan) {
+            super(plan);
+            mJournal = new StringBuilder();
+        }
+
+        public void visitSingleOperator(SingleOperator so) throws 
ParseException {
+            mJournal.append(so.name());
+            mJournal.append(' ');
+        }
+
+        public void visitMultiOperator(MultiOperator mo) throws ParseException 
{
+            mJournal.append(mo.name());
+            mJournal.append(' ');
+        }
+
+        public String getJournal() {
+            return mJournal.toString();
+        }
+    }
+
+    class TDepthVisitor extends TVisitor {
+
+        TDepthVisitor(TPlan plan) {
+            super(plan);
+        }
+
+        public void visit() throws ParseException {
+            depthFirst();
+        }
+    }
+
+    class TDependVisitor extends TVisitor {
+
+        TDependVisitor(TPlan plan) {
+            super(plan);
+        }
+
+        public void visit() throws ParseException {
+            dependencyOrder();
+        }
+    }
+
+    @Test
+    public void testAddRemove() throws Exception {
+        // Test that we can add and remove nodes from the plan.  Also test
+        // that we can fetch the nodes by operator key, by operator, by
+        // roots, by leaves, that they have no predecessors and no
+        // successors.
+
+        TPlan plan = new TPlan();
+        Operator[] ops = new Operator[3];
+        for (int i = 0; i < 3; i++) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+
+        // All should be roots, as none are connected
+        List<Operator> roots = plan.getRoots();
+        for (int i = 0; i < 3; i++) {
+            assertTrue("Roots should contain operator " + i,
+                roots.contains(ops[i]));
+        }
+
+        // All should be leaves, as none are connected
+        List<Operator> leaves = plan.getLeaves();
+        for (int i = 0; i < 3; i++) {
+            assertTrue("Leaves should contain operator " + i,
+                leaves.contains(ops[i]));
+        }
+
+        // Each operator should have no successors or predecessors.
+        assertNull(plan.getSuccessors(ops[1]));
+        assertNull(plan.getPredecessors(ops[1]));
+
+        // Make sure we find them all when we iterate through them.
+        Set<Operator> s = new HashSet<Operator>();
+        Iterator<Operator> j = plan.iterator();
+        while (j.hasNext()) {
+            s.add(j.next());
+        }
+
+        for (int i = 0; i < 3; i++) {
+            assertTrue("Iterator should contain operator " + i,
+                s.contains(ops[i]));
+        }
+
+        // Test that we can find an operator by its key.
+        Operator op = plan.getOperator(new OperatorKey("", 1));
+        assertEquals("Expected to get back ops[1]", ops[1], op);
+
+        // Test that we can get an operator key by its operator
+        OperatorKey opkey = new OperatorKey("", 1);
+        assertTrue("Expected to get back key for ops[1]",
+            opkey.equals(plan.getOperatorKey(ops[1])));
+
+        // Test that we can remove operators
+        plan.remove(ops[2]);
+
+        assertEquals("Should only have two roots now.", 2,
+            plan.getRoots().size());
+        assertEquals("Should only have two leaves now.", 2,
+            plan.getLeaves().size());
+
+        j = plan.iterator();
+        int k;
+        for (k = 0; j.hasNext(); k++) j.next();
+        assertEquals("Iterator should only return two now", 2, k);
+
+        // Remove all operators
+        plan.remove(ops[0]);
+        plan.remove(ops[1]);
+
+        assertEquals("Should only have no roots now.", 0,
+            plan.getRoots().size());
+        assertEquals("Should only have no leaves now.", 0,
+            plan.getLeaves().size());
+
+        j = plan.iterator();
+        assertFalse("Iterator should return nothing now", j.hasNext());
+    }
+
+    @Test
+    public void testLinearGraph() throws Exception {
+        TPlan plan = new TPlan();
+        Operator[] ops = new Operator[5];
+        for (int i = 0; i < 5; i++) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+            if (i > 0) plan.connect(ops[i - 1], ops[i]);
+        }
+
+        // Test that connecting a node not yet in the plan is detected.
+        Operator bogus = new SingleOperator("X");
+        boolean sawError = false;
+        try {
+            plan.connect(ops[2], bogus);
+        } catch (IOException ioe) {
+            assertEquals("Attempt to connect operator X which is not in "
+                + "the plan.", ioe.getMessage());
+            sawError = true;
+        }
+        assertTrue("Should have caught an error when we tried to connect a "
+            + "node that was not in the plan", sawError);
+
+        // Get roots should just return ops[0]
+        List<Operator> roots = plan.getRoots();
+        assertEquals(1, roots.size());
+        assertEquals(roots.get(0), ops[0]);
+
+        // Get leaves should just return ops[4]
+        List<Operator> leaves = plan.getLeaves();
+        assertEquals(1, leaves.size());
+        assertEquals(leaves.get(0), ops[4]);
+
+        // Test that connecting another input to SingleOperator gives
+        // error.
+        plan.add(bogus);
+        sawError = false;
+        try {
+            plan.connect(bogus, ops[1]);
+        } catch (IOException ioe) {
+            assertEquals("Attempt to give operator of type Single " +
+                "multiple inputs.  This operator does "
+                + "not support multiple inputs.", ioe.getMessage());
+            sawError = true;
+        }
+        assertTrue("Should have caught an error when we tried to connect a "
+            + "second input to a Single", sawError);
+
+        // Test that connecting another output to SingleOperator gives
+        // error.
+        sawError = false;
+        try {
+            plan.connect(ops[0], bogus);
+        } catch (IOException ioe) {
+            assertEquals("Attempt to give operator of type Single " +
+                "multiple outputs.  This operator does "
+                + "not support multiple outputs.", ioe.getMessage());
+            sawError = true;
+        }
+        assertTrue("Should have caught an error when we tried to connect a "
+            + "second output to a Single", sawError);
+        plan.remove(bogus);
+
+        // Successor for ops[1] should be ops[2]
+        Collection s = plan.getSuccessors(ops[1]);
+        assertEquals(1, s.size());
+        Iterator i = s.iterator();
+        assertEquals(ops[2], i.next());
+
+        // Predecessor for ops[1] should be ops[0]
+        Collection p = plan.getPredecessors(ops[1]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[0], i.next());
+
+        assertEquals("Nodes: 2 1 4 0 3 FromEdges: 2->3 1->2 0->1 3->4 ToEdges: 
2->1 1->0 4->3 3->2 ", plan.display());
+
+        // Visit it depth first
+        TVisitor visitor = new TDepthVisitor(plan);
+        visitor.visit();
+        assertEquals("0 1 2 3 4 ", visitor.getJournal());
+
+        // Visit it dependency order
+        visitor = new TDependVisitor(plan);
+        visitor.visit();
+        assertEquals("0 1 2 3 4 ", visitor.getJournal());
+
+        // Test disconnect
+        plan.disconnect(ops[2], ops[3]);
+        assertEquals("Nodes: 2 1 4 0 3 FromEdges: 1->2 0->1 3->4 ToEdges: 2->1 
1->0 4->3 ", plan.display());
+
+        // Test remove
+        plan.remove(ops[1]);
+        assertEquals("Nodes: 2 4 0 3 FromEdges: 3->4 ToEdges: 4->3 ", 
plan.display());
+    }
+
+    @Test
+    public void testDAG() throws Exception {
+        TPlan plan = new TPlan();
+        Operator[] ops = new Operator[6];
+        for (int i = 0; i < 6; i++) {
+            ops[i] = new MultiOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        plan.connect(ops[0], ops[2]);
+        plan.connect(ops[1], ops[2]);
+        plan.connect(ops[2], ops[3]);
+        plan.connect(ops[3], ops[4]);
+        plan.connect(ops[3], ops[5]);
+
+        // Get roots should return ops[0] and ops[1]
+        List<Operator> roots = plan.getRoots();
+        assertEquals(2, roots.size());
+        assertTrue(roots.contains(ops[0]));
+        assertTrue(roots.contains(ops[1]));
+
+        // Get leaves should return ops[4] and ops[5]
+        List<Operator> leaves = plan.getLeaves();
+        assertEquals(2, leaves.size());
+        assertTrue(leaves.contains(ops[4]));
+        assertTrue(leaves.contains(ops[5]));
+
+        // Successor for ops[3] should be ops[4] and ops[5]
+        List<Operator> s = new ArrayList<Operator>(plan.getSuccessors(ops[3]));
+        assertEquals(2, s.size());
+        assertTrue(s.contains(ops[4]));
+        assertTrue(s.contains(ops[5]));
+        
+        // Predecessor for ops[2] should be ops[0] and ops[1]
+        s = new ArrayList<Operator>(plan.getPredecessors(ops[2]));
+        assertEquals(2, s.size());
+        assertTrue(s.contains(ops[0]));
+        assertTrue(s.contains(ops[1]));
+
+        assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 2->3 
ToEdges: 3->2 5->3 4->3 2->0 2->1 ", plan.display());
+
+        // Visit it depth first
+        TVisitor visitor = new TDepthVisitor(plan);
+        visitor.visit();
+        assertEquals("0 2 3 4 5 1 ", visitor.getJournal());
+
+        // Visit it dependency order
+        visitor = new TDependVisitor(plan);
+        visitor.visit();
+        assertEquals("0 1 2 3 5 4 ", visitor.getJournal());
+
+        // Test disconnect
+        plan.disconnect(ops[2], ops[3]);
+        assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 
ToEdges: 5->3 4->3 2->0 2->1 ", plan.display());
+
+        // Test remove
+        plan.remove(ops[2]);
+        assertEquals("Nodes: 0 3 5 1 4 FromEdges: 3->4 3->5 ToEdges: 5->3 4->3 
", plan.display());
+    }
+
+
+}
+


Reply via email to