Author: fhanik Date: Fri Mar 17 08:35:32 2006 New Revision: 386668 URL: http://svn.apache.org/viewcvs?rev=386668&view=rev Log: Added new todo items, Added a replicated hash map, all-to-all replication Simplified BioSender, writeData isn't needed if it is a 2 line method
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386668&r1=386667&r2=386668&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Fri Mar 17 08:35:32 2006 @@ -267,22 +267,12 @@ keepalive(); if ( reconnect ) closeSocket(); if (!isConnected()) openSocket(); - writeData(data); - } - - /** - * Sent real cluster Message to socket stream - * FIXME send compress - * @param data - * @throws IOException - * @since 5.5.10 - */ - protected void writeData(byte[] data) throws IOException { soOut.write(data); soOut.flush(); if (getWaitForAck()) waitForAck(); - } + } + /** * Wait for Acknowledgement from other server * FIXME Please, not wait only for three charcters, better control that the wait ack message is correct. Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386668&r1=386667&r2=386668&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 17 08:35:32 2006 @@ -15,19 +15,11 @@ */ package org.apache.catalina.tribes.tipis; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; @@ -37,9 +29,6 @@ import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.io.DirectByteArrayOutputStream; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.mcast.MemberImpl; /** * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=386668&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Fri Mar 17 08:35:32 2006 @@ -0,0 +1,281 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.tipis; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; + +/** + * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical + * copy of the map.<br><br> + * This map implementation doesn't have a background thread running to replicate changes. + * If you do have changes without invoking put/remove then you need to invoke one of the following methods: + * <ul> + * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li> + * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li> + * </ul> + * the <code>boolean</code> value in the <code>replicate</code> method used to decide + * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface + * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface + * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code> + * will replicate all objects in this map that are using this node as primary. + * + * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to + * avoid memory leaks.<br><br> + * @todo implement periodic sync/transfer thread + * @author Filip Hanik + * @version 1.0 + */ +public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReplicatedMap.class); + +//------------------------------------------------------------------------------ +// CONSTRUCTORS / DESTRUCTORS +//------------------------------------------------------------------------------ + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap + */ + public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, + float loadFactor) { + super(channel, timeout, mapContextName, initialCapacity, loadFactor); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + */ + public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) { + super(channel, timeout, mapContextName, initialCapacity); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + */ + public ReplicatedMap(Channel channel, long timeout, String mapContextName) { + super(channel, timeout, mapContextName); + } + +//------------------------------------------------------------------------------ +// METHODS TO OVERRIDE +//------------------------------------------------------------------------------ + /** + * publish info about a map pair (key/value) to other nodes in the cluster + * @param key Object + * @param value Object + * @return Member - the backup node + * @throws ChannelException + */ + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { + //select a backup node + Member[] backup = getMapMembers(); + + if (backup == null || backup.length == 0) return null; + + //publish the data out to all nodes + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, + (Serializable) key, null, null, backup); + + getChannel().send(getMapMembers(), msg); + + return backup; + } + + public Object get(Object key) { + MapEntry entry = (MapEntry)super.get(key); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + /** + * Returns true if the key has an entry in the map. + * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> + * will make this entry primary for the group + * @param key Object + * @return boolean + */ + public boolean containsKey(Object key) { + return super.containsKey(key); + } + + public Object put(Object key, Object value) { + if (! (key instanceof Serializable))throw new IllegalArgumentException("Key is not serializable:" +key.getClass().getName()); + if (value == null)return remove(key); + if (! (value instanceof Serializable))throw new IllegalArgumentException("Value is not serializable:" +value.getClass().getName()); + + MapEntry entry = new MapEntry( (Serializable) key, (Serializable) value); + entry.setBackup(false); + entry.setProxy(false); + + Object old = null; + + //make sure that any old values get removed + if (containsKey(key)) old = remove(key); + try { + Member[] backup = publishEntryInfo(key, value); + entry.setBackupNodes(backup); + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); + } + super.put(key, entry); + return old; + } + + /** + * Copies all values from one map to this instance + * @param m Map + * @todo send one bulk message + */ + public void putAll(Map m) { + Iterator i = m.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = (Map.Entry) i.next(); + put(entry.getKey(), entry.getValue()); + } + } + + /** + * Removes an object from this map, it will also remove it from + * + * @param key Object + * @return Object + */ + public Object remove(Object key) { + MapEntry entry = (MapEntry)super.remove(key); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null); + try { + getChannel().send(getMapMembers(), msg); + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x); + } + return entry != null ? entry.getValue() : null; + } + + public void clear() { + //only delete active keys + Iterator keys = keySet().iterator(); + while (keys.hasNext()) remove(keys.next()); + } + + public boolean containsValue(Object value) { + if (value == null) { + return super.containsValue(value); + } else { + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if ( value.equals(entry.getValue()) ) return true; + } //while + return false; + } //end if + } + + public Object clone() { + throw new UnsupportedOperationException("This operation is not valid on a replicated map"); + } + + /** + * Returns the entire contents of the map + * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information + * about the object. + * @return Set + */ + public Set entrySetFull() { + return super.entrySet(); + } + + public Set keySetFull() { + return super.keySet(); + } + + public Set entrySet() { + LinkedHashSet set = new LinkedHashSet(super.size()); + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if (entry.isPrimary()) set.add(entry.getValue()); + } + return Collections.unmodifiableSet(set); + } + + public Set keySet() { + //todo implement + //should only return keys where this is active. + LinkedHashSet set = new LinkedHashSet(super.size()); + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if (entry.isPrimary()) set.add(entry.getKey()); + } + return Collections.unmodifiableSet(set); + } + + public int sizeFull() { + return super.size(); + } + + public int size() { + return super.size(); + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + return false; + } + + public boolean isEmpty() { + return size() == 0; + } + + public Collection values() { + ArrayList values = new ArrayList(super.size()); + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if (entry.isPrimary()) values.add(entry.getValue()); + } + return Collections.unmodifiableCollection(values); + } + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=386668&r1=386667&r2=386668&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri Mar 17 08:35:32 2006 @@ -8,12 +8,19 @@ Code Tasks: =========================================== -18. Implement SSL encryption over message transfers +21. Implement a WAN membership layer, using a WANMbrInterceptor and a + WAN Router/Forwarder (Tipi on top of a ManagedChannel) + +20. Implement a TCP membership interceptor, for guaranteed functionality, not just discovery + +19. Implement a hardcoded tcp membership + +18. Implement SSL encryption over message transfers, BIO and NIO 8. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning - (This is useful when synchronized=false and waitForAck=false, to improve -parallel processing, but you want to have all messages sent in parallel and -don't return until all have been processed on the remote end.) + (This is useful when synchronized=false and waitForAck=false, to improve + parallel processing, but you want to have all messages sent in parallel and + don't return until all have been processed on the remote end.) 9. CoordinatorInterceptor - manages the selection of a cluster coordinator just had a brilliant idea, if GroupChannel keeps its own view of members, --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]