Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java?rev=649715&r1=649714&r2=649715&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java
 Fri Apr 18 15:32:08 2008
@@ -26,11 +26,11 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
 
 public class NotEqualToExpr extends ComparisonOperator {
 
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws ParseException {
+    public void visit(ExprPlanVisitor v) throws VisitorException {
         v.visitNotEqualTo(this);
     }
 

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
 Fri Apr 18 15:32:08 2008
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * DependencyOrderWalker traverses the graph in such a way that no node is 
visited
+ * before all the nodes it depends on have been visited.  Beyond this, it does 
not
+ * guarantee any particular order.  So, you have a graph with node 1 2 3 4, and
+ * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be 
visited
+ * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
+ * visited first.
+ */
+public class DependencyOrderWalker <O extends Operator, P extends 
OperatorPlan<O>>
+    extends PlanWalker<O, P> {
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public DependencyOrderWalker(P plan) {
+        super(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        // This is highly inefficient, but our graphs are small so it should 
be okay.
+        // The algorithm works by starting at any node in the graph, finding 
it's
+        // predecessors and calling itself for each of those predecessors.  
When it
+        // finds a node that has no unfinished predecessors it puts that node 
in the
+        // list.  It then unwinds itself putting each of the other nodes in 
the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put 
any
+        // nodes in the graph twice.
+
+        List<O> fifo = new ArrayList<O>();
+        Set<O> seen = new HashSet<O>();
+        List<O> leaves = mPlan.getLeaves();
+        if (leaves == null) return;
+        for (O op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (O op: fifo) {
+            op.visit(visitor);
+        }
+    }
+
+    public PlanWalker<O, P> spawnChildWalker(P plan) { 
+        return new DependencyOrderWalker<O, P>(plan);
+    }
+
+    private void doAllPredecessors(O node,
+                                   Set<O> seen,
+                                   Collection<O> fifo) throws VisitorException 
{
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<O> preds = mPlan.getPredecessors(node);
+            if (preds != null && preds.size() > 0) {
+                // Do all our predecessors before ourself
+                for (O op : preds) {
+                    doAllPredecessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DepthFirstWalker.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DepthFirstWalker.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DepthFirstWalker.java 
(added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DepthFirstWalker.java 
Fri Apr 18 15:32:08 2008
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * DepthFirstWalker traverses a plan in a depth first manner.  One important 
note
+ * is that, in compliance with the PlanWalker contract, it only visits each 
node in
+ * the graph once.
+ */
+public class DepthFirstWalker <O extends Operator, P extends OperatorPlan<O>>
+    extends PlanWalker<O, P> {
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public DepthFirstWalker(P plan) {
+        super(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        List<O> roots = mPlan.getRoots();
+        Set<O> seen = new HashSet<O>();
+
+        depthFirst(null, roots, seen, visitor);
+    }
+
+    public PlanWalker<O, P> spawnChildWalker(P plan) {
+        return new DepthFirstWalker<O, P>(plan);
+    }
+
+    private void depthFirst(O node,
+                            Collection<O> successors,
+                            Set<O> seen,
+                            PlanVisitor<O, P> visitor) throws VisitorException 
{
+        if (successors == null) return;
+
+        for (O suc : successors) {
+            if (seen.add(suc)) {
+                suc.visit(visitor);
+                Collection<O> newSuccessors = mPlan.getSuccessors(suc);
+                depthFirst(suc, newSuccessors, seen, visitor);
+            }
+        }
+    }
+}

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java?rev=649715&r1=649714&r2=649715&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java Fri 
Apr 18 15:32:08 2008
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.pig.impl.logicalLayer.OperatorKey;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 
 /**
  * Base class for all types of operators.
@@ -59,10 +58,10 @@
      * 
      * @param v
      *            Visitor to visit with.
-     * @throws ParseException
+     * @throws VisitorException
      *             if the visitor has a problem.
      */
-    public abstract void visit(V v) throws ParseException;
+    public abstract void visit(V v) throws VisitorException;
 
     /**
      * Indicates whether this operator supports multiple inputs.

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java?rev=649715&r1=649714&r2=649715&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanVisitor.java 
Fri Apr 18 15:32:08 2008
@@ -23,8 +23,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import java.util.Stack;
 
 /**
  * A visitor mechanism for navigating and operating on a plan of 
@@ -38,86 +37,54 @@
     protected P mPlan;
 
     /**
+     * Guaranteed to always point to the walker currently being used.
+     */
+    protected PlanWalker<O, P> mCurrentWalker;
+
+    private Stack<PlanWalker<O, P>> mWalkers;
+
+    /**
      * Entry point for visiting the plan.
-     * @throws ParseException if an error is encountered while visiting.
+     * @throws VisitorException if an error is encountered while visiting.
      */
-    public abstract void visit() throws ParseException;
+    public void visit() throws VisitorException {
+        mCurrentWalker.walk(this);
+    }
+
+    public P getPlan() {
+        return mPlan;
+    }
 
     /**
      * @param plan OperatorPlan this visitor will visit.
+     * @param walker PlanWalker this visitor will use to traverse the plan.
      */
-    protected PlanVisitor(P plan) {
+    protected PlanVisitor(P plan, PlanWalker<O, P> walker) {
         mPlan = plan;
+        mCurrentWalker = walker;
+        mWalkers = new Stack<PlanWalker<O, P>>();
     }
 
     /**
-     * Visit the graph in a depth first traversal.
-     * @throws ParseException if the underlying visitor has a problem.
+     * Push the current walker onto the stack of saved walkers and begin using
+     * the newly passed walker as the current walker.
+     * @param newWalker new walker to set as the current walker.
      */
-    protected void depthFirst() throws ParseException {
-        List<O> roots = mPlan.getRoots();
-        Set<O> seen = new HashSet<O>();
-
-        depthFirst(null, roots, seen);
-    }
-
-    private void depthFirst(O node,
-                            Collection<O> successors,
-                            Set<O> seen) throws ParseException {
-        if (successors == null) return;
-
-        for (O suc : successors) {
-            if (seen.add(suc)) {
-                suc.visit(this);
-                Collection<O> newSuccessors = mPlan.getSuccessors(suc);
-                depthFirst(suc, newSuccessors, seen);
-            }
-        }
+    protected void pushWalker(PlanWalker<O, P> walker) {
+        mWalkers.push(mCurrentWalker);
+        mCurrentWalker = walker;
     }
 
     /**
-     * Visit the graph in a way that guarantees that no node is visited before
-     * all the nodes it depends on (that is, all those giving it input) have
-     * already been visited.
-     * @throws ParseException if the underlying visitor has a problem.
+     * Pop the next to previous walker off of the stack and set it as the 
current
+     * walker.  This will drop the reference to the current walker.
+     * @throws VisitorException if there are no more walkers on the stack.  In
+     * this case the current walker is not reset.
      */
-    protected void dependencyOrder() throws ParseException {
-        // This is highly inefficient, but our graphs are small so it should 
be okay.
-        // The algorithm works by starting at any node in the graph, finding 
it's
-        // predecessors and calling itself for each of those predecessors.  
When it
-        // finds a node that has no unfinished predecessors it puts that node 
in the
-        // list.  It then unwinds itself putting each of the other nodes in 
the list.
-        // It keeps track of what nodes it's seen as it goes so it doesn't put 
any
-        // nodes in the graph twice.
-
-        List<O> fifo = new ArrayList<O>();
-        Set<O> seen = new HashSet<O>();
-        List<O> leaves = mPlan.getLeaves();
-        if (leaves == null) return;
-        for (O op : leaves) {
-            doAllPredecessors(op, seen, fifo);
-        }
-
-        for (O op: fifo) {
-            op.visit(this);
-        }
-    }
-
-    private void doAllPredecessors(O node,
-                                   Set<O> seen,
-                                   Collection<O> fifo) throws ParseException {
-        if (!seen.contains(node)) {
-            // We haven't seen this one before.
-            Collection<O> preds = mPlan.getPredecessors(node);
-            if (preds != null && preds.size() > 0) {
-                // Do all our predecessors before ourself
-                for (O op : preds) {
-                    doAllPredecessors(op, seen, fifo);
-                }
-            }
-            // Now do ourself
-            seen.add(node);
-            fifo.add(node);
+    protected void popWalker() throws VisitorException {
+        if (mWalkers.empty()) {
+            throw new VisitorException("No more walkers to pop.");
         }
+        mCurrentWalker = mWalkers.pop();
     }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java?rev=649715&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java 
(added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java 
Fri Apr 18 15:32:08 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+/**
+ * PlanWalker encapsulates the logic to traverse a plan.  It is used only by
+ * visitors.
+ *
+ * All walkers must be constructed in a way that they only visit a given node
+ * once in a traversal.
+ */
+public abstract class PlanWalker <O extends Operator,
+       P extends OperatorPlan<O>> {
+
+    protected P mPlan;
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public PlanWalker(P plan) {
+        mPlan = plan;
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.  This can't be set 
in
+     * the constructor because the visitor is constructing this class, and does
+     * not yet have a 'this' pointer to send as an argument.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public abstract void walk(PlanVisitor<O, P> visitor) throws 
VisitorException;
+
+    /**
+     * Return a new instance of this same type of walker for a subplan.
+     * When this method is called the same type of walker with the
+     * provided plan set as the plan, must be returned.  This can then be
+     * used to walk subplans.  This allows abstract visitors to clone
+     * walkers without knowning the type of walker their subclasses used.
+     * @param plan Plan for the new walker.
+     * @return Instance of the same type of walker with mPlan set to plan.
+     */
+    public abstract PlanWalker<O, P> spawnChildWalker(P plan);
+
+}
+
+

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/VisitorException.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/VisitorException.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/VisitorException.java 
(added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/VisitorException.java 
Fri Apr 18 15:32:08 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class VisitorException extends FrontendException {
+
+    public VisitorException (String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public VisitorException() {
+        this(null, null);
+    }
+    
+    public VisitorException(String message) {
+        this(message, null);
+    }
+    
+    public VisitorException(Throwable cause) {
+        this(null, cause);
+    }
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
 Fri Apr 18 15:32:08 2008
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.List;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/******************************************************************************
+ * A class to optimize plans.  This class need not be subclassed for a
+ * particular type of plan.  It can be instantiated with a set of Rules and
+ * then optimize called.
+ *
+ */
+
+public class PlanOptimizer<O extends Operator, P extends OperatorPlan<O>> {
+    
+    private List<Rule> mRules;
+    private P mPlan;
+
+    /**
+     * @param plan Plan to optimize
+     * @param rules List of rules to attempt to apply.
+     */
+    public PlanOptimizer(P plan,
+                         List<Rule> rules) {
+        mRules = rules;
+        mPlan = plan;
+    }
+
+    /**
+     * Run the optimizer.  This method attempts to match each of the Rules
+     * against the plan.  If a Rule matches, it then calls the check
+     * method of the associated Transformer to give the it a chance to
+     * check whether it really wants to do the optimization.  If that
+     * returns true as well, then Transformer.transform is called. 
+     */
+    public void optimize() {
+        RuleMatcher matcher = new RuleMatcher();
+        for (Rule rule : mRules) {
+            if (matcher.match(rule)) {
+                // It matches the pattern.  Now check if the transformer
+                // approves as well.
+                List<O> matches = matcher.getMatches();
+                if (rule.transformer.check(matches)) {
+                    // The transformer approves.
+                    rule.transformer.transform(matches);
+                }
+            }
+        }
+    }
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Rule.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Rule.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Rule.java 
(added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Rule.java 
Fri Apr 18 15:32:08 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ * A rule for optimizing a plan.  The rule contains a pattern that must be
+ * matched in the plan before the optimizer can consider applying the rule
+ * and a transformer to do further checks and possibly transform the plan.
+ * The rule pattern is expressed as a list of node names, a map of edges in
+ * the plan, and a list of boolean values indicating whether the node is
+ * required.  For example, a rule pattern could be expressed as:
+ * [Filter, Filter] {[0, 1]} [true, true], which would indicate this rule
+ * matches two nodes of class name Filter, with an edge between the two,
+ * and both are required.
+ */
+public class Rule<O extends Operator, P extends OperatorPlan<O>> {
+
+    public List<String> nodes;
+       public Map<Integer, Integer> edges;
+       public List<Boolean> required;
+       public Transformer<O, P> transformer;
+
+    /**
+     * @param nodes List of node types to look for.
+     * @param edges Map of integers to integers.  Each integer
+     * represents the offset into nodes list.
+     * @param required List of boolean indicating whether given nodes are
+     * required for the pattern to match.
+     * @param transformer Transformer to apply if the rule matches.
+     */
+    public Rule(List<String> n,
+                Map<Integer, Integer> e, 
+                List<Boolean> r,
+                Transformer<O, P> t) {
+        nodes = n;
+        edges = e;
+        required = r;
+        transformer = t;
+    }
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
 Fri Apr 18 15:32:08 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ * RuleMatcher contains the logic to determine whether a given rule matches.
+ * This alone does not mean the rule will be applied.  Transformer.check()
+ * still has to pass before Transfomer.transform() is called. 
+ *
+ */
+
+public class RuleMatcher<O extends Operator, P extends OperatorPlan<O>> {
+
+    private Rule<O, P> mRule;
+       private List<O> mMatches;
+    private P mPlan; // for convience.
+
+    /**
+     * Test a rule to see if it matches the current plan.
+     * @param rule Rule to test for a match.
+     * @return true if the plan matches.
+     */
+    public boolean match(Rule<O, P> rule) {
+        mRule = rule;
+        int sz = mRule.nodes.size();
+        mMatches = new ArrayList<O>(sz);
+        mPlan = mRule.transformer.getPlan();
+        // Add sufficient slots in the matches array
+        for (int i = 0; i < sz; i++) mMatches.add(null);
+           return beginMatch(mPlan.getRoots());
+    }
+
+       /**
+        * @return a list that with the instances of nodes that matched the
+     * pattern defined by
+        * the rule.  The nodes will be in the vector in the order they are
+        * specified in the rule.
+        */
+       List<O> getMatches() {
+        return mMatches;
+    }
+
+       /*
+     * This pattern matching is fairly simple and makes some important
+     * assumptions.
+     * 1)  The pattern to be matched must be expressable as a simple list.  
That
+     *     is it can match patterns like: 1->2, 2->3.  It cannot match patterns
+     *     like 1->2, 1->3.
+     * 2)  The pattern must always begin with the first node in the nodes 
array.
+     *     After that it can go where it wants.  So 1->3, 3->4 is legal.  A
+     *     pattern of 2->1 is not.
+     *
+     */
+    private boolean beginMatch(List<O> nodes) {
+        if (nodes == null) return false;
+        for (O op : nodes) {
+                   List<O> successors = new ArrayList<O>();
+                   if (op.getClass().getName().equals(mRule.nodes.get(0))) {
+                           mMatches.set(0, op);
+                           // Follow the edge to see the next node we should 
be looking for.
+                           Integer nextOpNum = mRule.edges.get(0);
+                if (nextOpNum == null) {
+                                   // This was looking for a single node
+                                   return true;
+                           }
+                           successors = mPlan.getSuccessors(op);
+                if (successors == null) return false;
+                for (O successorOp : successors) {
+                                   if (continueMatch(successorOp, nextOpNum)) 
return true;
+                           }
+                   }
+
+                   // That node didn't match.  Go to this nodes successors and 
see if
+                   // any of them match.
+                   successors = mPlan.getSuccessors(op);
+                   if (beginMatch(successors)) return true;
+           }
+           // If we get here we haven't found it.
+           return false;
+    }
+
+       private boolean continueMatch(O current, Integer nodeNumber) {
+           if (current.getClass().getName() == mRule.nodes.get(nodeNumber)) {
+                   mMatches.set(nodeNumber, current);
+
+                   // Follow the edge to see the next node we should be 
looking for.
+                   Integer nextOpNum = mRule.edges.get(nodeNumber);
+                   if (nextOpNum == null) {
+                           // We've comleted the match
+                           return true;
+                   }
+            List<O> successors = new ArrayList<O>();
+                   successors = mPlan.getSuccessors(current);
+            if (successors == null) return false;
+            for (O successorOp : successors) {
+                           if (continueMatch(successorOp, nextOpNum)) return 
true;
+                   }
+           } else if (!mRule.required.get(nodeNumber)) {
+                   // This node was optional, so it's okay if we don't match, 
keep
+                   // going anyway.  Keep looking for the current node (don't 
find our
+                   // successors, but look for the next edge.
+                   Integer nextOpNum = mRule.edges.get(nodeNumber);
+                   if (nextOpNum == null) {
+                           // We've comleted the match
+                           return true;
+                   }
+                   if (continueMatch(current, nextOpNum)) return true;
+           }
+
+           // We can arrive here either because we didn't match at this node or
+           // further down the line.  One way or another we need to remove 
ourselves
+           // from the match vector and return false.
+           mMatches.set(nodeNumber, null);
+           return false;
+    }
+
+}

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=649715&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
 Fri Apr 18 15:32:08 2008
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.List;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+
+/**
+ * Transformer represents one tranform that the optimizer can
+ * apply to a graph.  This class is a special case of a PlanVisitor,so it
+ * can navigate the plan.
+ */
+public abstract class Transformer<O extends Operator, P extends 
OperatorPlan<O>> extends PlanVisitor<O, P> {
+
+    /**
+     * @param plan OperatorPlan to be optimized.
+     */
+    protected Transformer(P plan, PlanWalker<O, P> walker) {
+        super(plan, walker);
+    }
+
+       /**
+        * check if the transform should be done.  If this is being called then
+        * the pattern matches, but there may be other criteria that must be met
+        * as well.
+        * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+        * etc.)  Remember that somes entries in node[] may be NULL since they 
may
+        * not be created until after the transform.
+        * @returns - true if the transform should be done.
+        */
+       public abstract boolean check(List<O> nodes);
+
+       /**
+        * Transform the tree
+        * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+        * etc.)  This call must destruct any nodes that are being removed as 
part
+        * of the transform and remove them from the nodes vector and construct
+        * any that are being created as part of the transform and add them at 
the
+        * appropriate point to the nodes vector.
+        */
+       public abstract void transform(List<O> nodes);
+
+}
+

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=649715&r1=649714&r2=649715&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
Fri Apr 18 15:32:08 2008
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -27,14 +28,15 @@
 import java.util.TreeSet;
 
 import org.apache.pig.impl.logicalLayer.OperatorKey;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.plan.*;
+import org.apache.pig.impl.plan.optimizer.*;
 
 import org.junit.Test;
 
 /**
  * Test the generic operator classes (Operator, OperatorPlan,
- * PlanVisitor).
+ * PlanVisitor).  Also includes tests for optimizer framework, since that
+ * can use the same generic test operators.
  */
 
 public class TestOperatorPlan extends junit.framework.TestCase {
@@ -74,7 +76,7 @@
         }
 
         @Override
-        public void visit(PlanVisitor v) throws ParseException {
+        public void visit(PlanVisitor v) throws VisitorException {
             ((TVisitor)v).visitSingleOperator(this);
         }
 
@@ -97,7 +99,7 @@
             return true;
         }
 
-        public void visit(PlanVisitor v) throws ParseException {
+        public void visit(PlanVisitor v) throws VisitorException {
             ((TVisitor)v).visitMultiOperator(this);
         }
 
@@ -153,20 +155,20 @@
         }
     }
 
-    abstract class TVisitor extends PlanVisitor {
+    abstract class TVisitor extends PlanVisitor<TOperator, TPlan> {
         protected StringBuilder mJournal;
 
-        TVisitor(TPlan plan) {
-            super(plan);
+        TVisitor(TPlan plan, PlanWalker<TOperator, TPlan> walker) {
+            super(plan, walker);
             mJournal = new StringBuilder();
         }
 
-        public void visitSingleOperator(SingleOperator so) throws 
ParseException {
+        public void visitSingleOperator(SingleOperator so) throws 
VisitorException {
             mJournal.append(so.name());
             mJournal.append(' ');
         }
 
-        public void visitMultiOperator(MultiOperator mo) throws ParseException 
{
+        public void visitMultiOperator(MultiOperator mo) throws 
VisitorException {
             mJournal.append(mo.name());
             mJournal.append(' ');
         }
@@ -179,22 +181,14 @@
     class TDepthVisitor extends TVisitor {
 
         TDepthVisitor(TPlan plan) {
-            super(plan);
-        }
-
-        public void visit() throws ParseException {
-            depthFirst();
+            super(plan, new DepthFirstWalker(plan));
         }
     }
 
     class TDependVisitor extends TVisitor {
 
         TDependVisitor(TPlan plan) {
-            super(plan);
-        }
-
-        public void visit() throws ParseException {
-            dependencyOrder();
+            super(plan, new DependencyOrderWalker(plan));
         }
     }
 
@@ -441,6 +435,334 @@
         plan.remove(ops[2]);
         assertEquals("Nodes: 0 1 3 4 5 FromEdges: 3->4 3->5 ToEdges: 4->3 5->3 
", plan.display());
     }
+
+    class AlwaysTransform extends Transformer<TOperator, TPlan> {
+        public boolean mTransformed = false;
+
+        AlwaysTransform(TPlan plan) {
+            super(plan, new DepthFirstWalker<TOperator, TPlan>(plan));
+        }
+
+        public boolean check(List<TOperator> nodes) {
+            return true;
+        }
+
+        public void transform(List<TOperator> nodes) {
+            mTransformed = true;
+        }
+    }
+
+    // Test that we don't match when nodes don't match pattern.  Will give
+    // a pattern of S->S->M and a plan of S->M->S.
+    @Test
+    public void testOptimizerDifferentNodes() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new MultiOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new SingleOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(true);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertFalse(transformer.mTransformed);
+    }
+
+    // Test that we don't match when edges don't match pattern.  Will give
+    // a pattern of S->S->M and a plan of S->S M.
+    @Test
+    public void testOptimizerDifferentEdges() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(true);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertFalse(transformer.mTransformed);
+    }
+
+    // Test that we match when appropriate.  Will give
+    // a pattern of S->S->M and a plan of S->S->M.
+    @Test
+    public void testOptimizerMatches() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(true);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertTrue(transformer.mTransformed);
+    }
+
+    // Test that we match when the whole plan doesn't match.  Will give
+    // a pattern of S->S->M and a plan of S->S->S->M.
+    @Test
+    public void testOptimizerMatchesPart() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[4];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new SingleOperator("3");
+        plan.add(ops[2]);
+        ops[3] = new MultiOperator("4");
+        plan.add(ops[3]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+        plan.connect(ops[2], ops[3]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(true);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertTrue(transformer.mTransformed);
+    }
+
+    // Test that we match when a node is optional and the optional node is
+    // present.  Will give
+    // a pattern of S->S->M (with second S optional) and a plan of S->S->M.
+    @Test
+    public void testOptimizerOptionalMatches() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(false);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertTrue(transformer.mTransformed);
+    }
+
+    // Test that we match when a node is optional and the optional node is
+    // missing.  Will give
+    // a pattern of S->S->M (with second S optional) and a plan of S->M.
+    @Test
+    public void testOptimizerOptionalMissing() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[2];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new MultiOperator("2");
+        plan.add(ops[1]);
+        plan.connect(ops[0], ops[1]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(false);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertTrue(transformer.mTransformed);
+    }
+
+    class NeverTransform extends Transformer<TOperator, TPlan> {
+        public boolean mTransformed = false;
+
+        NeverTransform(TPlan plan) {
+            super(plan, new DepthFirstWalker<TOperator, TPlan>(plan));
+        }
+
+        public boolean check(List<TOperator> nodes) {
+            return false;
+        }
+
+        public void transform(List<TOperator> nodes) {
+            mTransformed = true;
+        }
+    }
+
+    // Test that even if we match, if check returns false then the optimization
+    // is not done.
+    @Test
+    public void testCheck() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$SingleOperator");
+        nodes.add("org.apache.pig.test.TestOperatorPlan$MultiOperator");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        edges.put(0, 1);
+        edges.put(1, 2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(3);
+        required.add(true);
+        required.add(true);
+        required.add(true);
+        NeverTransform transformer = new NeverTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        ArrayList<Rule> rules = new ArrayList<Rule>(1);
+        rules.add(r);
+
+        PlanOptimizer<TOperator, TPlan> optimizer =
+            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+
+        optimizer.optimize();
+        assertFalse(transformer.mTransformed);
+    }
+
 
 
 }


Reply via email to