Author: fhanik Date: Wed Feb 22 13:16:25 2006 New Revision: 379904 URL: http://svn.apache.org/viewcvs?rev=379904&view=rev Log: Completed a first version of the independent GroupChannel, still need to remove all the JMX stuff from the core components, JMX should be monitoring using outside beans, not be baked into the code
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java?rev=379904&r1=379903&r2=379904&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java Wed Feb 22 13:16:25 2006 @@ -59,5 +59,8 @@ * @return The port */ public int getPort(); + + public void setMessageListener(MessageListener listener); + public MessageListener getMessageListener(); } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=379904&r1=379903&r2=379904&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java Wed Feb 22 13:16:25 2006 @@ -22,6 +22,7 @@ import org.apache.catalina.cluster.ClusterSender; import org.apache.catalina.cluster.ClusterReceiver; import org.apache.catalina.cluster.ClusterChannel; +import java.io.IOException; /** @@ -36,8 +37,19 @@ private ClusterSender clusterSender; private MembershipService membershipService; + public ChannelCoordinator() { + + } + + public ChannelCoordinator(ClusterReceiver receiver, + ClusterSender sender, + MembershipService service) { + this(); + this.setClusterReceiver(receiver); + this.setClusterSender(sender); + this.setMembershipService(service); + } - /** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, null or zero length means all @@ -45,9 +57,12 @@ * @param options int - sender options, see class documentation * @return ClusterMessage[] - the replies from the members, if any. */ - public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) { - throw new UnsupportedOperationException(); - //implement sending and receiving logic. + public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) throws IOException { + if ( destination == null ) destination = membershipService.getMembers(); + for ( int i=0; i<destination.length; i++ ) { + clusterSender.sendMessage(msg,destination[i]); + } + return null; } @@ -97,6 +112,17 @@ } } + + public void memberAdded(Member member){ + if ( clusterSender!=null ) clusterSender.add(member); + super.memberAdded(member); + } + + public void memberDisappeared(Member member){ + if ( clusterSender!=null ) clusterSender.remove(member); + super.memberDisappeared(member); + } + public ClusterReceiver getClusterReceiver() { return clusterReceiver; @@ -111,7 +137,13 @@ } public void setClusterReceiver(ClusterReceiver clusterReceiver) { - this.clusterReceiver = clusterReceiver; + if ( clusterReceiver != null ) { + this.clusterReceiver = clusterReceiver; + this.clusterReceiver.setMessageListener(this); + } else { + if (this.clusterReceiver!=null ) this.clusterReceiver.setMessageListener(null); + this.clusterReceiver = null; + } } public void setClusterSender(ClusterSender clusterSender) { Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=379904&r1=379903&r2=379904&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java Wed Feb 22 13:16:25 2006 @@ -19,6 +19,7 @@ import org.apache.catalina.cluster.Member; import org.apache.catalina.cluster.MembershipListener; import org.apache.catalina.cluster.MessageListener; +import java.io.IOException; /** * Abstract class for the interceptor base class. @@ -51,7 +52,7 @@ return previous; } - public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) { + public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) throws IOException { return getNext().sendMessage(destination, msg,options); } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=379904&r1=379903&r2=379904&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java Wed Feb 22 13:16:25 2006 @@ -71,7 +71,17 @@ * @return ClusterMessage[] - the replies from the members, if any. */ public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) throws ChannelException { - throw new UnsupportedOperationException("Method send not yet implemented."); + if ( msg == null ) return null; + msg.setAddress(getMembershipService().getLocalMember()); + msg.setCompress(msg.FLAG_ALLOWED); + msg.setTimestamp(System.currentTimeMillis()); + msg.setResend(msg.FLAG_FORBIDDEN); + try { + if (interceptors != null)return interceptors.sendMessage(destination, msg, options); + else return this.coordinator.sendMessage(destination, msg, options); + }catch ( Exception x ) { + throw new ChannelException(x); + } } /** Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java?rev=379904&r1=379903&r2=379904&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Wed Feb 22 13:16:25 2006 @@ -16,6 +16,7 @@ package org.apache.catalina.cluster.tcp; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.SelectableChannel; @@ -25,13 +26,14 @@ import java.nio.channels.SocketChannel; import java.util.Iterator; -import org.apache.catalina.cluster.io.ObjectReader; -import org.apache.catalina.cluster.io.ListenCallback; -import org.apache.catalina.cluster.ClusterReceiver; -import org.apache.catalina.util.StringManager; -import java.io.IOException; import org.apache.catalina.cluster.ClusterMessage; +import org.apache.catalina.cluster.ClusterReceiver; +import org.apache.catalina.cluster.group.ChannelInterceptorBase; +import org.apache.catalina.cluster.io.ListenCallback; +import org.apache.catalina.cluster.io.ObjectReader; import org.apache.catalina.cluster.io.XByteBuffer; +import org.apache.catalina.util.StringManager; +import org.apache.catalina.cluster.MessageListener; /** * @author Filip Hanik @@ -70,7 +72,7 @@ private Object interestOpsMutex = new Object(); - + private MessageListener listener = null; public ReplicationListener() { } @@ -309,7 +311,20 @@ } public void messageDataReceived(ClusterData data) { - //nothing to do yet + if ( this.listener != null ) { + try { + ClusterMessage msg = deserialize(data); + listener.messageReceived(msg); + }catch ( java.io.IOException x ) { + if ( log.isErrorEnabled() ) { + log.error("Unable to receive and deserialize cluster data. IOException.",x); + } + }catch ( java.lang.ClassNotFoundException cx ) { + if ( log.isErrorEnabled() ) { + log.error("Unable to receive and deserialize cluster data. ClassNotFoundException.",cx); + } + } + } } /** @@ -385,8 +400,16 @@ return tcpListenPort; } + public MessageListener getMessageListener() { + return listener; + } + public void setTcpListenPort(int tcpListenPort) { this.tcpListenPort = tcpListenPort; + } + + public void setMessageListener(MessageListener listener) { + this.listener = listener; } public String getHost() { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]