Author: gates Date: Tue Mar 18 21:45:46 2008 New Revision: 638706 URL: http://svn.apache.org/viewvc?rev=638706&view=rev Log: Moved OperatorPlan off of Apache Commons MultiValueMap to local implementation of MultiMap. This was necessary so that OperatorPlan could implement Serializable.
Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java Removed: incubator/pig/branches/types/lib/commons-collections-3.2.jar Modified: incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=638706&r1=638705&r2=638706&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Tue Mar 18 21:45:46 2008 @@ -132,7 +132,7 @@ </target> <target name="compile-sources"> - <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/plan/*.java, **/logicalLayer/LogicalPlan.java, **/logicalLayer/LOEval.java, **/logicalLayer/LOSort.java, **/logicalLayer/LOGenerate.java, **/logicalLayer/LOVisitor.java " destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}"> + <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/plan/*.java, **/test/TestOperatorPlan.java, **/logicalLayer/LogicalPlan.java, **/logicalLayer/LOEval.java, **/logicalLayer/LOSort.java, **/logicalLayer/LOGenerate.java, **/logicalLayer/LOVisitor.java, **/logicalLayer/schema/Schema.java " destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}"> <compilerarg line="${javac.args} ${javac.args.warnings}" /> <classpath refid="${cp}" /> <!-- Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=638706&r1=638705&r2=638706&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Mar 18 21:45:46 2008 @@ -66,6 +66,18 @@ type = DataType.TUPLE; schema = s; } + + @Override + public boolean equals(Object other) { + if (!(other instanceof FieldSchema)) return false; + FieldSchema fs = (FieldSchema)other; + // Fields can have different names and still be equal. But + // types and schemas (if they're a tuple) must match. + if (type != fs.type) return false; + if (schema != fs.schema) return false; + + return true; + } } private List<FieldSchema> mFields; @@ -152,7 +164,22 @@ if (otherFs.type != DataType.UNKNOWN) ourFs.type = otherFs.type; if (otherFs.schema != null) ourFs.schema = otherFs.schema; } + } + @Override + public boolean equals(Object other) { + if (!(other instanceof Schema)) return false; + + Schema s = (Schema)other; + + if (s.size() != size()) return false; + + Iterator<FieldSchema> i = mFields.iterator(); + Iterator<FieldSchema> j = s.mFields.iterator(); + while (i.hasNext()) { + if (!(i.next().equals(j.next()))) return false; + } + return true; } } Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java?rev=638706&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java Tue Mar 18 21:45:46 2008 @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * An implementation of multi-map. We can't use Apache commons + * MultiValueMap because it isn't serializable. And we don't want to use + * MultiHashMap, as it is marked deprecated. + * + * This class can't extend Map, because it needs to change the semantics of + * put, so that you give it one key and one value, and it either creates a + * new entry with the key and a new collection of value (if the is not yet + * in the map) or adds the values to the existing collection for the key + * (if the key is already in the map). + */ +public class MultiMap<K, V> implements Serializable { + + private HashMap<K, ArrayList<V>> mMap = new HashMap<K, ArrayList<V>>(); + + /** + * Add an element to the map. + * @param key The key to store the value under. If the key already + * exists the value will be added to the collection for that key, it + * will not replace the existing value (as in a standard map). + * @param value value to store. + */ + public void put(K key, V value) { + ArrayList<V> list = mMap.get(key); + if (list == null) { + list = new ArrayList<V>(); + list.add(value); + mMap.put(key, list); + } else { + list.add(value); + } + } + + /** + * Get the collection of values associated with a given key. + * @param key Key to fetch values for. + * @return collection of values, or null if the key is not in the map. + */ + public Collection<V> get(K key) { + return mMap.get(key); + } + + /** + * Remove one value from an existing key. If that is the last value + * for the key, then remove the key too. + * @param key Key to remove the value from. + * @param value Value to remove. + * @return The value being removed, or null if the key or value does + * not exist. + */ + public V remove(K key, V value) { + ArrayList<V> list = mMap.get(key); + if (list == null) return null; + + Iterator<V> i = list.iterator(); + V keeper = null; + while (i.hasNext()) { + keeper = i.next(); + if (keeper.equals(value)) { + i.remove(); + break; + } + } + + if (list.size() == 0) { + mMap.remove(key); + } + + return keeper; + } + + /** + * Get a set of all the keys in this map. + * @return Set of keys. + */ + public Set<K> keySet() { + return mMap.keySet(); + } + + + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=638706&r1=638705&r2=638706&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Mar 18 21:45:46 2008 @@ -28,16 +28,16 @@ import org.apache.pig.impl.logicalLayer.OperatorKey; -import org.apache.commons.collections.map.MultiValueMap; +//import org.apache.commons.collections.map.MultiValueMap; /** * A generic graphing class for use by LogicalPlan, PhysicalPlan, etc. */ -public abstract class OperatorPlan<E extends Operator> implements Iterable { +public abstract class OperatorPlan<E extends Operator> implements Iterable, Serializable { protected Map<E, OperatorKey> mOps; protected Map<OperatorKey, E> mKeys; - protected MultiValueMap mFromEdges; - protected MultiValueMap mToEdges; + protected MultiMap<E, E> mFromEdges; + protected MultiMap<E, E> mToEdges; private List<E> mRoots; private List<E> mLeaves; @@ -48,8 +48,8 @@ mLeaves = new ArrayList<E>(); mOps = new HashMap<E, OperatorKey>(); mKeys = new HashMap<OperatorKey, E>(); - mFromEdges = new MultiValueMap(); - mToEdges = new MultiValueMap(); + mFromEdges = new MultiMap<E, E>(); + mToEdges = new MultiMap<E, E>(); } /** @@ -59,7 +59,7 @@ public List<E> getRoots() { if (mRoots.size() == 0 && mOps.size() > 0) { for (E op : mOps.keySet()) { - if (mToEdges.getCollection(op) == null) { + if (mToEdges.get(op) == null) { mRoots.add(op); } } @@ -74,7 +74,7 @@ public List<E> getLeaves() { if (mLeaves.size() == 0 && mOps.size() > 0) { for (E op : mOps.keySet()) { - if (mFromEdges.getCollection(op) == null) { + if (mFromEdges.get(op) == null) { mLeaves.add(op); } } @@ -130,7 +130,7 @@ // Check to see if the from operator already has outputs, and if so // whether it supports multiple outputs. - if (mFromEdges.getCollection(from) != null && + if (mFromEdges.get(from) != null && !from.supportsMultipleOutputs()) { throw new IOException("Attempt to give operator of type " + from.typeName() + " multiple outputs. This operator does " @@ -139,7 +139,7 @@ // Check to see if the to operator already has inputs, and if so // whether it supports multiple inputs. - if (mToEdges.getCollection(to) != null && + if (mToEdges.get(to) != null && !to.supportsMultipleInputs()) { throw new IOException("Attempt to give operator of type " + from.typeName() + " multiple inputs. This operator does " @@ -190,7 +190,7 @@ * @return Collection of nodes. */ public List<E> getPredecessors(E op) { - return (List<E>)mToEdges.getCollection(op); + return (List<E>)mToEdges.get(op); } @@ -201,7 +201,7 @@ * @return Collection of nodes. */ public List<E> getSuccessors(E op) { - return (List<E>)mFromEdges.getCollection(op); + return (List<E>)mFromEdges.get(op); } public Iterator<E> iterator() { @@ -214,12 +214,12 @@ } private void removeEdges(E op, - MultiValueMap fromMap, - MultiValueMap toMap) { + MultiMap<E, E> fromMap, + MultiMap<E, E> toMap) { // Find all of the from edges, as I have to remove all the associated to // edges. Need to make a copy so we can delete from the map without // screwing up our iterator. - Collection c = fromMap.getCollection(op); + Collection c = fromMap.get(op); if (c == null) return; ArrayList al = new ArrayList(c); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=638706&r1=638705&r2=638706&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Tue Mar 18 21:45:46 2008 @@ -105,26 +105,26 @@ } } - class TPlan extends OperatorPlan { + class TPlan extends OperatorPlan<TOperator> { public String display() { StringBuilder buf = new StringBuilder(); buf.append("Nodes: "); - for (Operator op : mOps.keySet()) { + for (TOperator op : mOps.keySet()) { buf.append(op.name()); buf.append(' '); } buf.append("FromEdges: "); - Iterator i = mFromEdges.keySet().iterator(); + Iterator<TOperator> i = mFromEdges.keySet().iterator(); while (i.hasNext()) { - Operator from = (Operator)i.next(); - Iterator j = mFromEdges.iterator(from); + TOperator from = i.next(); + Iterator<TOperator> j = mFromEdges.get(from).iterator(); while (j.hasNext()) { buf.append(from.name()); buf.append("->"); - buf.append(((Operator)j.next()).name()); + buf.append(j.next().name()); buf.append(' '); } } @@ -132,12 +132,12 @@ buf.append("ToEdges: "); i = mToEdges.keySet().iterator(); while (i.hasNext()) { - Operator from = (Operator)i.next(); - Iterator j = mToEdges.iterator(from); + TOperator from = i.next(); + Iterator<TOperator> j = mToEdges.get(from).iterator(); while (j.hasNext()) { buf.append(from.name()); buf.append("->"); - buf.append(((Operator)j.next()).name()); + buf.append(j.next().name()); buf.append(' '); } } @@ -198,21 +198,21 @@ // successors. TPlan plan = new TPlan(); - Operator[] ops = new Operator[3]; + TOperator[] ops = new TOperator[3]; for (int i = 0; i < 3; i++) { ops[i] = new SingleOperator(Integer.toString(i)); plan.add(ops[i]); } // All should be roots, as none are connected - List<Operator> roots = plan.getRoots(); + List<TOperator> roots = plan.getRoots(); for (int i = 0; i < 3; i++) { assertTrue("Roots should contain operator " + i, roots.contains(ops[i])); } // All should be leaves, as none are connected - List<Operator> leaves = plan.getLeaves(); + List<TOperator> leaves = plan.getLeaves(); for (int i = 0; i < 3; i++) { assertTrue("Leaves should contain operator " + i, leaves.contains(ops[i])); @@ -223,8 +223,8 @@ assertNull(plan.getPredecessors(ops[1])); // Make sure we find them all when we iterate through them. - Set<Operator> s = new HashSet<Operator>(); - Iterator<Operator> j = plan.iterator(); + Set<TOperator> s = new HashSet<TOperator>(); + Iterator<TOperator> j = plan.iterator(); while (j.hasNext()) { s.add(j.next()); } @@ -235,7 +235,7 @@ } // Test that we can find an operator by its key. - Operator op = plan.getOperator(new OperatorKey("", 1)); + TOperator op = plan.getOperator(new OperatorKey("", 1)); assertEquals("Expected to get back ops[1]", ops[1], op); // Test that we can get an operator key by its operator @@ -272,7 +272,7 @@ @Test public void testLinearGraph() throws Exception { TPlan plan = new TPlan(); - Operator[] ops = new Operator[5]; + TOperator[] ops = new TOperator[5]; for (int i = 0; i < 5; i++) { ops[i] = new SingleOperator(Integer.toString(i)); plan.add(ops[i]); @@ -280,7 +280,7 @@ } // Test that connecting a node not yet in the plan is detected. - Operator bogus = new SingleOperator("X"); + TOperator bogus = new SingleOperator("X"); boolean sawError = false; try { plan.connect(ops[2], bogus); @@ -293,12 +293,12 @@ + "node that was not in the plan", sawError); // Get roots should just return ops[0] - List<Operator> roots = plan.getRoots(); + List<TOperator> roots = plan.getRoots(); assertEquals(1, roots.size()); assertEquals(roots.get(0), ops[0]); // Get leaves should just return ops[4] - List<Operator> leaves = plan.getLeaves(); + List<TOperator> leaves = plan.getLeaves(); assertEquals(1, leaves.size()); assertEquals(leaves.get(0), ops[4]); @@ -344,7 +344,7 @@ i = p.iterator(); assertEquals(ops[0], i.next()); - assertEquals("Nodes: 2 1 4 0 3 FromEdges: 2->3 1->2 0->1 3->4 ToEdges: 2->1 1->0 4->3 3->2 ", plan.display()); + assertEquals("Nodes: 2 0 1 3 4 FromEdges: 2->3 0->1 1->2 3->4 ToEdges: 2->1 1->0 3->2 4->3 ", plan.display()); // Visit it depth first TVisitor visitor = new TDepthVisitor(plan); @@ -358,17 +358,17 @@ // Test disconnect plan.disconnect(ops[2], ops[3]); - assertEquals("Nodes: 2 1 4 0 3 FromEdges: 1->2 0->1 3->4 ToEdges: 2->1 1->0 4->3 ", plan.display()); + assertEquals("Nodes: 2 0 1 3 4 FromEdges: 0->1 1->2 3->4 ToEdges: 2->1 1->0 4->3 ", plan.display()); // Test remove plan.remove(ops[1]); - assertEquals("Nodes: 2 4 0 3 FromEdges: 3->4 ToEdges: 4->3 ", plan.display()); + assertEquals("Nodes: 2 0 3 4 FromEdges: 3->4 ToEdges: 4->3 ", plan.display()); } @Test public void testDAG() throws Exception { TPlan plan = new TPlan(); - Operator[] ops = new Operator[6]; + TOperator[] ops = new TOperator[6]; for (int i = 0; i < 6; i++) { ops[i] = new MultiOperator(Integer.toString(i)); plan.add(ops[i]); @@ -380,30 +380,30 @@ plan.connect(ops[3], ops[5]); // Get roots should return ops[0] and ops[1] - List<Operator> roots = plan.getRoots(); + List<TOperator> roots = plan.getRoots(); assertEquals(2, roots.size()); assertTrue(roots.contains(ops[0])); assertTrue(roots.contains(ops[1])); // Get leaves should return ops[4] and ops[5] - List<Operator> leaves = plan.getLeaves(); + List<TOperator> leaves = plan.getLeaves(); assertEquals(2, leaves.size()); assertTrue(leaves.contains(ops[4])); assertTrue(leaves.contains(ops[5])); // Successor for ops[3] should be ops[4] and ops[5] - List<Operator> s = new ArrayList<Operator>(plan.getSuccessors(ops[3])); + List<TOperator> s = new ArrayList<TOperator>(plan.getSuccessors(ops[3])); assertEquals(2, s.size()); assertTrue(s.contains(ops[4])); assertTrue(s.contains(ops[5])); // Predecessor for ops[2] should be ops[0] and ops[1] - s = new ArrayList<Operator>(plan.getPredecessors(ops[2])); + s = new ArrayList<TOperator>(plan.getPredecessors(ops[2])); assertEquals(2, s.size()); assertTrue(s.contains(ops[0])); assertTrue(s.contains(ops[1])); - assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 2->3 ToEdges: 3->2 5->3 4->3 2->0 2->1 ", plan.display()); + assertEquals("Nodes: 4 5 0 2 1 3 FromEdges: 0->2 2->3 1->2 3->4 3->5 ToEdges: 4->3 5->3 2->0 2->1 3->2 ", plan.display()); // Visit it depth first TVisitor visitor = new TDepthVisitor(plan); @@ -413,15 +413,15 @@ // Visit it dependency order visitor = new TDependVisitor(plan); visitor.visit(); - assertEquals("0 1 2 3 5 4 ", visitor.getJournal()); + assertEquals("0 1 2 3 4 5 ", visitor.getJournal()); // Test disconnect plan.disconnect(ops[2], ops[3]); - assertEquals("Nodes: 0 3 5 1 4 2 FromEdges: 0->2 3->4 3->5 1->2 ToEdges: 5->3 4->3 2->0 2->1 ", plan.display()); + assertEquals("Nodes: 4 5 0 2 1 3 FromEdges: 0->2 1->2 3->4 3->5 ToEdges: 4->3 5->3 2->0 2->1 ", plan.display()); // Test remove plan.remove(ops[2]); - assertEquals("Nodes: 0 3 5 1 4 FromEdges: 3->4 3->5 ToEdges: 5->3 4->3 ", plan.display()); + assertEquals("Nodes: 4 5 0 1 3 FromEdges: 3->4 3->5 ToEdges: 4->3 5->3 ", plan.display()); }