pero 2005/07/08 13:51:48 Modified: modules/cluster/src/share/org/apache/catalina/cluster/io ListenCallback.java SocketObjectReader.java modules/cluster/src/share/org/apache/catalina/cluster/tcp ClusterReceiverBase.java DataSender.java LocalStrings.properties PooledSocketSender.java ReplicationListener.java ReplicationTransmitter.java SendMessageData.java SimpleTcpCluster.java SocketReplicationListener.java SocketReplicationThread.java TcpReplicationThread.java mbeans-descriptors.xml Log: send ack before message is handled default is now that no wait ack is default. Revision Changes Path 1.4 +11 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ListenCallback.java Index: ListenCallback.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ListenCallback.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ListenCallback.java 26 Jun 2005 21:21:49 -0000 1.3 +++ ListenCallback.java 8 Jul 2005 20:50:30 -0000 1.4 @@ -26,8 +26,6 @@ * @author Peter Rossbach * @version $Revision$, $Date$ */ - - public interface ListenCallback { /** @@ -35,6 +33,15 @@ * been received from one of the cluster nodes. * @param data - the message bytes received from the cluster/replication system */ - // public void messageDataReceived(byte[] data); public void messageDataReceived(ClusterData data); + + /** receiver must be send ack + */ + public boolean isSendAck() ; + + /** send ack + * + */ + public void sendAck() throws java.io.IOException ; + } \ No newline at end of file 1.4 +3 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java Index: SocketObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SocketObjectReader.java 26 Jun 2005 21:21:49 -0000 1.3 +++ SocketObjectReader.java 8 Jul 2005 20:50:30 -0000 1.4 @@ -74,6 +74,8 @@ int pkgCnt = 0; while ( pkgExists ) { ClusterData cdata = buffer.extractPackage(true); + if(callback.isSendAck()) + callback.sendAck() ; callback.messageDataReceived(cdata); pkgCnt++; pkgExists = buffer.doesPackageExist(); 1.9 +9 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java Index: ClusterReceiverBase.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- ClusterReceiverBase.java 1 Jul 2005 18:35:54 -0000 1.8 +++ ClusterReceiverBase.java 8 Jul 2005 20:50:30 -0000 1.9 @@ -505,4 +505,12 @@ } } } + + /* (non-Javadoc) + * @see org.apache.catalina.cluster.io.ListenCallback#sendAck() + */ + public void sendAck() throws IOException { + // do nothing + } + } 1.14 +36 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Index: DataSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- DataSender.java 26 Jun 2005 21:21:50 -0000 1.13 +++ DataSender.java 8 Jul 2005 20:50:30 -0000 1.14 @@ -120,6 +120,11 @@ protected long dataResendCounter = 0; /** + * number of data failure sends + */ + protected long dataFailureCounter = 0; + + /** * doProcessingStats */ protected boolean doProcessingStats = false; @@ -182,7 +187,7 @@ /** * wait for receiver Ack */ - private boolean waitForAck = true; + private boolean waitForAck = false; /** * number of socket close @@ -199,6 +204,12 @@ */ private int socketOpenFailureCounter = 0 ; + /** + * After failure make a resend + */ + private boolean resend = false ; + + // ------------------------------------------------------------- Constructor public DataSender(String domain,InetAddress host, int port) { @@ -378,6 +389,13 @@ return dataResendCounter; } + /** + * @return Returns the dataFailureCounter. + */ + public long getDataFailureCounter() { + return dataFailureCounter; + } + public InetAddress getAddress() { return address; } @@ -478,6 +496,18 @@ } /** + * @return Returns the resend. + */ + public boolean isResend() { + return resend; + } + /** + * @param resend The resend to set. + */ + public void setResend(boolean resend) { + this.resend = resend; + } + /** * @return Returns the socket. */ public Socket getSocket() { @@ -564,6 +594,7 @@ connectCounter = isConnected() ? 1 : 0; missingAckCounter = 0; dataResendCounter = 0; + dataFailureCounter = 0 ; socketOpenCounter =isConnected() ? 1 : 0; socketOpenFailureCounter = 0 ; socketCloseCounter = 0; @@ -727,7 +758,8 @@ writeData(data); messageTransfered = true ; } catch (java.io.IOException x) { - if(data.getResend() != ClusterMessage.FLAG_FORBIDDEN) { + if(data.getResend() == ClusterMessage.FLAG_ALLOWED || + (data.getResend() == ClusterMessage.FLAG_DEFAULT && isResend() )) { // second try with fresh connection dataResendCounter++; if (log.isTraceEnabled()) @@ -761,6 +793,7 @@ new Integer(port), data.getUniqueId(), new Long(data.getMessage().length))); } } else { + dataFailureCounter++; if (log.isWarnEnabled()) log.warn(sm.getString("IDataSender.send.lost", address.getHostAddress(), new Integer(port), data.getType(), data.getUniqueId()),exception); 1.14 +2 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Index: LocalStrings.properties =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- LocalStrings.properties 1 Jul 2005 18:35:54 -0000 1.13 +++ LocalStrings.properties 8 Jul 2005 20:50:30 -0000 1.14 @@ -27,6 +27,8 @@ PoolSocketSender.noMoreSender=No socket sender available for client [{0}:{1,number,integer}] did it disappear? ReplicationTransmitter.getProperty=get property {0} ReplicationTransmitter.setProperty=set property {0}: {1} old value {2} +ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1} +ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1} ReplicationValve.filter.loading=Loading request filters={0} ReplicationValve.filter.token=Request filter={0} ReplicationValve.filter.token.failure=Unable to compile filter={0} 1.16 +1 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.15 retrieving revision 1.16 diff -u -r1.15 -r1.16 --- PooledSocketSender.java 26 Jun 2005 21:21:50 -0000 1.15 +++ PooledSocketSender.java 8 Jul 2005 20:50:30 -0000 1.16 @@ -234,6 +234,7 @@ sender.setKeepAliveTimeout(parent.getKeepAliveTimeout()); sender.setAckTimeout(parent.getAckTimeout()); sender.setWaitForAck(parent.isWaitForAck()); + sender.setResend(parent.isResend()); return sender; } 1.23 +2 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Index: ReplicationListener.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java,v retrieving revision 1.22 retrieving revision 1.23 diff -u -r1.22 -r1.23 --- ReplicationListener.java 15 Apr 2005 20:14:14 -0000 1.22 +++ ReplicationListener.java 8 Jul 2005 20:50:30 -0000 1.23 @@ -244,4 +244,5 @@ } } + } 1.36 +10 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.35 retrieving revision 1.36 diff -u -r1.35 -r1.36 --- ReplicationTransmitter.java 30 Jun 2005 13:03:34 -0000 1.35 +++ ReplicationTransmitter.java 8 Jul 2005 20:50:30 -0000 1.36 @@ -35,7 +35,6 @@ import org.apache.catalina.core.StandardHost; import org.apache.catalina.util.StringManager; import org.apache.tomcat.util.IntrospectionUtils; -import org.apache.tomcat.util.digester.SetTopRule; /** * Transmit message to ohter cluster members create sender from replicationMode @@ -517,6 +516,7 @@ public void start() throws java.io.IOException { if (cluster != null) { ObjectName clusterName = cluster.getObjectName(); + ObjectName transmitterName = null ; try { MBeanServer mserver = cluster.getMBeanServer(); Container container = cluster.getContainer(); @@ -524,7 +524,7 @@ if (container instanceof StandardHost) { name += ",host=" + clusterName.getKeyProperty("host"); } - ObjectName transmitterName = new ObjectName(name); + transmitterName = new ObjectName(name); if (mserver.isRegistered(transmitterName)) { if (log.isWarnEnabled()) log.warn(sm.getString( @@ -535,6 +535,10 @@ setObjectName(transmitterName); mserver.registerMBean(cluster.getManagedBean(this), getObjectName()); + if(log.isInfoEnabled()) + log.info(sm.getString("ReplicationTransmitter.started", + clusterName, transmitterName)); + } catch (Exception e) { log.warn(e); } @@ -565,6 +569,9 @@ } catch (Exception e) { log.error(e); } + if(log.isInfoEnabled()) + log.info(sm.getString("ReplicationTransmitter.stopped", + cluster.getObjectName(), getObjectName())); } } 1.3 +1 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java Index: SendMessageData.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SendMessageData.java 10 Apr 2005 16:20:46 -0000 1.2 +++ SendMessageData.java 8 Jul 2005 20:50:30 -0000 1.3 @@ -16,7 +16,6 @@ package org.apache.catalina.cluster.tcp; -import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.Member; /** 1.70 +7 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.69 retrieving revision 1.70 diff -u -r1.69 -r1.70 --- SimpleTcpCluster.java 1 Jul 2005 16:51:14 -0000 1.69 +++ SimpleTcpCluster.java 8 Jul 2005 20:50:30 -0000 1.70 @@ -181,7 +181,7 @@ private org.apache.catalina.cluster.ClusterDeployer clusterDeployer; - private boolean defaultMode = false ; + private boolean defaultMode = true ; /** * Listeners of messages @@ -701,7 +701,7 @@ clusterReceiver.setCompress(clusterSender.isCompress()); clusterReceiver.setCatalinaCluster(this); clusterReceiver.start(); - } else + } // start the sender. if(clusterSender != null && clusterReceiver != null) { @@ -746,8 +746,8 @@ * className="org.apache.catalina.cluster.mcast.McastService" * mcastAddr="228.0.0.4" * mcastPort="8012" - * mcastFrequency="500" - * mcastDropTime="3000"/> + * mcastFrequency="1000" + * mcastDropTime="30000"/> * </pre> */ protected void createDefaultMembershipService() { @@ -760,8 +760,8 @@ McastService mService= new McastService(); mService.setMcastAddr("228.0.0.4"); mService.setMcastPort(8012); - mService.setMcastFrequency(500); - mService.setMcastDropTime(3000); + mService.setMcastFrequency(1000); + mService.setMcastDropTime(30000); transferProperty("service",mService); setMembershipService(mService); } 1.6 +5 -7 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationListener.java Index: SocketReplicationListener.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationListener.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- SocketReplicationListener.java 1 Jul 2005 18:35:54 -0000 1.5 +++ SocketReplicationListener.java 8 Jul 2005 20:50:30 -0000 1.6 @@ -21,7 +21,6 @@ import java.net.ServerSocket; import java.net.Socket; -import org.apache.catalina.cluster.io.SocketObjectReader; import org.apache.catalina.util.StringManager; /** @@ -32,7 +31,7 @@ // ---------------------------------------------------- Statics - public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(SocketReplicationListener.class); /** @@ -151,8 +150,7 @@ Socket socket = serverSocket.accept(); if (doListen) { SocketReplicationThread t = new SocketReplicationThread( - this, socket, new SocketObjectReader(socket, - this), isSendAck()); + this, socket); t.setDaemon(true); t.start(); } @@ -254,5 +252,5 @@ unLockSocket(); doListen = false; } - -} + + } 1.2 +18 -10 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationThread.java Index: SocketReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketReplicationThread.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- SocketReplicationThread.java 10 Apr 2005 16:20:46 -0000 1.1 +++ SocketReplicationThread.java 8 Jul 2005 20:50:30 -0000 1.2 @@ -19,6 +19,7 @@ import java.io.InputStream; import java.net.Socket; +import org.apache.catalina.cluster.io.ListenCallback; import org.apache.catalina.cluster.io.SocketObjectReader; /** @@ -27,7 +28,7 @@ * FIXME Socket timeout * @version $Revision$, $Date$ */ -public class SocketReplicationThread extends Thread { +public class SocketReplicationThread extends Thread implements ListenCallback { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory .getLog(SocketReplicationThread.class); @@ -43,8 +44,6 @@ private boolean keepRunning = true; - private boolean sendAck; - /** * Fork Listen Worker Thread! * @@ -52,13 +51,12 @@ * @param reader * @param sendAck */ - SocketReplicationThread(SocketReplicationListener master, Socket socket, - SocketObjectReader reader, boolean sendAck) { + SocketReplicationThread(SocketReplicationListener master, Socket socket + ) { super("ClusterListenThread-" + count++); this.master = master; this.socket = socket; - this.reader = reader; - this.sendAck = sendAck; + this.reader = new SocketObjectReader(socket,this); } /** @@ -83,6 +81,7 @@ if (log.isTraceEnabled()) { log.trace("sending " + ack + " ack packages to " + socket.getLocalPort() ); } + /** if (sendAck) { // ack only when message is complete receive while (ack > 0) { @@ -90,6 +89,7 @@ ack--; } } + **/ keepRunning = master.isDoListen(); } else // EOF @@ -110,13 +110,21 @@ socket = null; } } - + + public void messageDataReceived(ClusterData data) { + master.messageDataReceived(data); + } + + public boolean isSendAck() { + return master.isSendAck(); + } + /** * send a reply-acknowledgement * * @throws java.io.IOException */ - private void sendAck() throws java.io.IOException { + public void sendAck() throws java.io.IOException { socket.getOutputStream().write(ACK_COMMAND); if (log.isTraceEnabled()) { log.trace("ACK sent to " + socket.getPort()); 1.18 +2 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.17 retrieving revision 1.18 diff -u -r1.17 -r1.18 --- TcpReplicationThread.java 12 Apr 2005 18:56:07 -0000 1.17 +++ TcpReplicationThread.java 8 Jul 2005 20:50:30 -0000 1.18 @@ -132,6 +132,7 @@ if (log.isTraceEnabled()) { log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() ); } + if (sendAck) { while ( pkgcnt > 0 ) { sendAck(key,channel); 1.15 +28 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Index: mbeans-descriptors.xml =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v retrieving revision 1.14 retrieving revision 1.15 diff -u -r1.14 -r1.15 --- mbeans-descriptors.xml 1 Jul 2005 18:35:54 -0000 1.14 +++ mbeans-descriptors.xml 8 Jul 2005 20:50:30 -0000 1.15 @@ -408,6 +408,10 @@ description="Connect time for keep alive" type="long" writeable="false"/> + <attribute name="resend" + description="after send failure make a resend" + is="true" + type="boolean" /> <attribute name="connected" is="true" description="socket connected" @@ -489,6 +493,10 @@ description="counts data resends" type="long" writeable="false"/> + <attribute name="dataFailureCounter" + description="counts data send failures" + type="long" + writeable="false"/> <attribute name="inQueueCounter" description="counts all queued messages" type="long" @@ -598,6 +606,10 @@ description="Connect time for keep alive" type="long" writeable="false"/> + <attribute name="resend" + description="after send failure make a resend" + is="true" + type="boolean" /> <attribute name="connected" is="true" description="socket connected" @@ -679,6 +691,10 @@ description="counts data resends" type="long" writeable="false"/> + <attribute name="dataFailureCounter" + description="counts data send failures" + type="long" + writeable="false"/> <attribute name="inQueueCounter" description="counts all queued messages" type="long" @@ -751,6 +767,10 @@ <attribute name="keepAliveMaxRequestCount" description="max request over this socket" type="int"/> + <attribute name="resend" + description="after send failure make a resend" + is="true" + type="boolean" /> <attribute name="connected" is="true" description="socket connected" @@ -832,6 +852,10 @@ description="Connect time for keep alive" type="long" writeable="false"/> + <attribute name="resend" + description="after send failure make a resend" + is="true" + type="boolean" /> <attribute name="connected" is="true" description="socket connected" @@ -917,6 +941,10 @@ description="counts data resends" type="long" writeable="false"/> + <attribute name="dataFailureCounter" + description="counts data send failures" + type="long" + writeable="false"/> <operation name="connect" description="connect to other replication node" impact="ACTION"
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]