Author: fhanik
Date: Wed Feb 22 13:16:25 2006
New Revision: 379904

URL: http://svn.apache.org/viewcvs?rev=379904&view=rev
Log:
Completed a first version of the independent GroupChannel, still need to remove 
all the JMX stuff from the core components, JMX should be monitoring using 
outside beans, not be baked into the code

Modified:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
 Wed Feb 22 13:16:25 2006
@@ -59,5 +59,8 @@
      * @return The port
      */
     public int getPort();
+    
+    public void setMessageListener(MessageListener listener);
+    public MessageListener getMessageListener();
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
 Wed Feb 22 13:16:25 2006
@@ -22,6 +22,7 @@
 import org.apache.catalina.cluster.ClusterSender;
 import org.apache.catalina.cluster.ClusterReceiver;
 import org.apache.catalina.cluster.ClusterChannel;
+import java.io.IOException;
 
 
 /**
@@ -36,8 +37,19 @@
     private ClusterSender clusterSender;
     private MembershipService membershipService;
 
+    public ChannelCoordinator() {
+        
+    }
+    
+    public ChannelCoordinator(ClusterReceiver receiver,
+                              ClusterSender sender,
+                              MembershipService service) {
+        this();
+        this.setClusterReceiver(receiver);
+        this.setClusterSender(sender);
+        this.setMembershipService(service);
+    }
     
-
     /**
      * Send a message to one or more members in the cluster
      * @param destination Member[] - the destinations, null or zero length 
means all
@@ -45,9 +57,12 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any.
      */
-    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage 
msg, int options) {
-        throw new UnsupportedOperationException();
-        //implement sending and receiving logic.
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage 
msg, int options) throws IOException {
+        if ( destination == null ) destination = 
membershipService.getMembers();
+        for ( int i=0; i<destination.length; i++ ) {
+            clusterSender.sendMessage(msg,destination[i]);
+        }
+        return null;
     }
 
 
@@ -97,6 +112,17 @@
         }
 
     }
+    
+    public void memberAdded(Member member){
+        if ( clusterSender!=null ) clusterSender.add(member);
+        super.memberAdded(member);
+    }
+    
+    public void memberDisappeared(Member member){
+        if ( clusterSender!=null ) clusterSender.remove(member);
+        super.memberDisappeared(member);
+    }
+
 
     public ClusterReceiver getClusterReceiver() {
         return clusterReceiver;
@@ -111,7 +137,13 @@
     }
 
     public void setClusterReceiver(ClusterReceiver clusterReceiver) {
-        this.clusterReceiver = clusterReceiver;
+        if ( clusterReceiver != null ) {
+            this.clusterReceiver = clusterReceiver;
+            this.clusterReceiver.setMessageListener(this);
+        } else {
+            if  (this.clusterReceiver!=null ) 
this.clusterReceiver.setMessageListener(null);
+            this.clusterReceiver = null;
+        }
     }
 
     public void setClusterSender(ClusterSender clusterSender) {

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
 Wed Feb 22 13:16:25 2006
@@ -19,6 +19,7 @@
 import org.apache.catalina.cluster.Member;
 import org.apache.catalina.cluster.MembershipListener;
 import org.apache.catalina.cluster.MessageListener;
+import java.io.IOException;
 
 /**
  * Abstract class for the interceptor base class.
@@ -51,7 +52,7 @@
         return previous;
     }
 
-    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage 
msg, int options) {
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage 
msg, int options) throws IOException {
         return getNext().sendMessage(destination, msg,options);
     }
     

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
 Wed Feb 22 13:16:25 2006
@@ -71,7 +71,17 @@
      * @return ClusterMessage[] - the replies from the members, if any.
      */
     public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int 
options) throws ChannelException {
-        throw new UnsupportedOperationException("Method send not yet 
implemented.");
+        if ( msg == null ) return null;
+        msg.setAddress(getMembershipService().getLocalMember());
+        msg.setCompress(msg.FLAG_ALLOWED);
+        msg.setTimestamp(System.currentTimeMillis());
+        msg.setResend(msg.FLAG_FORBIDDEN);
+        try {
+            if (interceptors != null)return 
interceptors.sendMessage(destination, msg, options);
+            else return this.coordinator.sendMessage(destination, msg, 
options);
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
     }
     
     /**

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
 Wed Feb 22 13:16:25 2006
@@ -16,6 +16,7 @@
 
 package org.apache.catalina.cluster.tcp;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.SelectableChannel;
@@ -25,13 +26,14 @@
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 
-import org.apache.catalina.cluster.io.ObjectReader;
-import org.apache.catalina.cluster.io.ListenCallback;
-import org.apache.catalina.cluster.ClusterReceiver;
-import org.apache.catalina.util.StringManager;
-import java.io.IOException;
 import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.group.ChannelInterceptorBase;
+import org.apache.catalina.cluster.io.ListenCallback;
+import org.apache.catalina.cluster.io.ObjectReader;
 import org.apache.catalina.cluster.io.XByteBuffer;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.MessageListener;
 
 /**
  * @author Filip Hanik
@@ -70,7 +72,7 @@
 
 
     private Object interestOpsMutex = new Object();
-
+    private MessageListener listener = null;
     public ReplicationListener() {
     }
 
@@ -309,7 +311,20 @@
     }
 
     public void messageDataReceived(ClusterData data) {
-        //nothing to do yet
+        if ( this.listener != null ) {
+            try {
+                ClusterMessage msg = deserialize(data);
+                listener.messageReceived(msg);
+            }catch ( java.io.IOException x ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. 
IOException.",x);
+                }
+            }catch ( java.lang.ClassNotFoundException cx ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. 
ClassNotFoundException.",cx);
+                }
+            }
+        }
     }
 
     /**
@@ -385,8 +400,16 @@
         return tcpListenPort;
     }
 
+    public MessageListener getMessageListener() {
+        return listener;
+    }
+
     public void setTcpListenPort(int tcpListenPort) {
         this.tcpListenPort = tcpListenPort;
+    }
+
+    public void setMessageListener(MessageListener listener) {
+        this.listener = listener;
     }
 
     public String getHost() {



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to