cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java ReplicationTransmitter.java
fhanik 2005/04/14 13:07:59 Modified:modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java ReplicationTransmitter.java Log: Fixed auto format Revision ChangesPath 1.12 +3 -11 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- PooledSocketSender.java 10 Apr 2005 16:20:46 - 1.11 +++ PooledSocketSender.java 14 Apr 2005 20:07:59 - 1.12 @@ -115,8 +115,7 @@ } SocketSender sender = senderQueue.getSender(0); if (sender == null) { -log.warn(sm.getString(PoolSocketSender.noMoreSender, this -.getAddress(), new Integer(this.getPort(; +log.warn(sm.getString(PoolSocketSender.noMoreSender, this.getAddress(), new Integer(this.getPort(; return; } //send the message @@ -199,14 +198,7 @@ mutex.wait(timeout); } catch (Exception x) { PooledSocketSender.log -.warn( -sm -.getString( - PoolSocketSender.senderQueue.sender.failed, - parent.getAddress(), -new Integer(parent - .getPort())), -x); + .warn(sm.getString(PoolSocketSender.senderQueue.sender.failed,parent.getAddress(),new Integer(parent.getPort())),x); }//catch }//end if if (sender != null) { @@ -266,4 +258,4 @@ } } } -} \ No newline at end of file +} 1.26 +3 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.25 retrieving revision 1.26 diff -u -r1.25 -r1.26 --- ReplicationTransmitter.java 12 Apr 2005 18:56:07 - 1.25 +++ ReplicationTransmitter.java 14 Apr 2005 20:07:59 - 1.26 @@ -815,10 +815,7 @@ } catch (Exception x) { if (log.isWarnEnabled()) { if (!sender.getSuspect()) { -log -.warn( -Unable to send replicated message, is server down?, -x); +log.warn(Unable to send replicated message, is server down?,x); } } sender.setSuspect(true); @@ -840,4 +837,4 @@ } -} \ No newline at end of file +} - To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java
pero2005/03/25 14:22:46 Modified:modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java Log: Add Fixme for 5.5.10 Revision ChangesPath 1.10 +2 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- PooledSocketSender.java 15 Feb 2005 09:31:45 - 1.9 +++ PooledSocketSender.java 25 Mar 2005 22:22:46 - 1.10 @@ -23,6 +23,8 @@ /** * Send cluster messages with a pool of sockets (25). * + * FIXME support processing stats + * * @author Filip Hanik * @author Peter Rossbach * @version 1.2 - To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java
fhanik 2004/01/14 20:19:50 Modified:modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java Log: Implemented the ability to not expire sessions on shutdown, since that will expire them across the cluster. Catching if the socket sender has disappeared, avoiding nullpointer exception Revision ChangesPath 1.7 +18 -15 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- DeltaManager.java 13 Jan 2004 05:26:59 - 1.6 +++ DeltaManager.java 15 Jan 2004 04:19:50 - 1.7 @@ -711,17 +711,20 @@ started = false; // Expire all active sessions -Session sessions[] = findSessions(); -for (int i = 0; i sessions.length; i++) { -DeltaSession session = (DeltaSession) sessions[i]; -if (!session.isValid()) -continue; -try { -session.expire(); -} catch (Throwable t) { -; -} -} +if ( this.getExpireSessionsOnShutdown() ) { +Session sessions[] = findSessions(); +for (int i = 0; i sessions.length; i++) { +DeltaSession session = (DeltaSession) sessions[i]; +if (!session.isValid()) +continue; +try { +session.expire(); +} +catch (Throwable t) { +; +} //catch +} //for +}//end if // Require a new random number generator if we are restarted this.random = null; 1.4 +5 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- PooledSocketSender.java 13 Jan 2004 05:46:21 - 1.3 +++ PooledSocketSender.java 15 Jan 2004 04:19:50 - 1.4 @@ -158,8 +158,8 @@ { //get a socket sender from the pool SocketSender sender = senderQueue.getSender(0); -if ( sender == null isConnected() ) { -log.warn(No socket sender available for client=+this.getAddress()+:+this.getPort()); +if ( sender == null ) { +log.warn(No socket sender available for client=+this.getAddress()+:+this.getPort()+ did it disappear?); return; }//end if //send the message - To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java ReplicationTransmitter.java SimpleTcpCluster.java
fhanik 2004/01/12 20:22:28 Modified:modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java DeltaRequest.java DeltaSession.java modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java ReplicationTransmitter.java SimpleTcpCluster.java Log: Fixed a bug in a dead lock with the pooled socket sender when a member crashes Recycling the delta request objects to avoid object instantiation, although I actually think this is slower Fixed the call back with the session and the delta request execution Revision ChangesPath 1.5 +9 -10 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- DeltaManager.java 13 Jan 2004 00:07:18 - 1.4 +++ DeltaManager.java 13 Jan 2004 04:22:28 - 1.5 @@ -406,24 +406,23 @@ } -private DeltaRequest loadDeltaRequest(byte[] data) throws +private DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException { ByteArrayInputStream fis = null; ReplicationStream ois = null; Loader loader = null; ClassLoader classLoader = null; fis = new ByteArrayInputStream(data); -BufferedInputStream bis = new BufferedInputStream(fis); ois = new ReplicationStream(fis,container.getLoader().getClassLoader()); -DeltaRequest dreq = (DeltaRequest)ois.readObject(); +session.getDeltaRequest().readExternal(ois); ois.close(); -return dreq; +return session.getDeltaRequest(); } private byte[] unloadDeltaRequest(DeltaRequest deltaRequest) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); -oos.writeObject(deltaRequest); +deltaRequest.writeExternal(oos); oos.flush(); oos.close(); return bos.toByteArray(); @@ -874,8 +873,8 @@ } case SessionMessage.EVT_SESSION_DELTA : { byte[] delta = msg.getSession(); - DeltaRequest dreq = loadDeltaRequest(delta); DeltaSession session = (DeltaSession)findSession(msg.getSessionID()); + DeltaRequest dreq = loadDeltaRequest(session,delta); dreq.execute(session); session.setPrimarySession(false); 1.4 +71 -23 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java Index: DeltaRequest.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- DeltaRequest.java 13 Jan 2004 00:07:18 - 1.3 +++ DeltaRequest.java 13 Jan 2004 04:22:28 - 1.4 @@ -66,7 +66,9 @@ /** * This class is used to track the series of actions that happens when - * a request is executed. These actions will then + * a request is executed. These actions will then translate into invokations of methods + * on the actual session. + * This class is NOT thread safe. One DeltaRequest per session * @author a href=mailto:[EMAIL PROTECTED]Filip Hanik/a * @version 1.0 */ @@ -95,6 +97,8 @@ private String sessionId; private LinkedList actions = new LinkedList(); +private LinkedList actionPool = new LinkedList(); + private boolean recordAllActions = false; public DeltaRequest() { @@ -140,7 +144,13 @@ int action, String name, Object value) { -AttributeInfo info = new AttributeInfo(type,action,name,value); +AttributeInfo info = null; +if ( this.actionPool.size() 0 ) { +info = (AttributeInfo)actionPool.removeFirst(); +info.init(type,action,name,value); +} else { +info = new AttributeInfo(type, action, name, value); +} //if we have already done something to this attribute, make sure //we don't send multiple actions across the wire if ( !recordAllActions)
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java
fhanik 2004/01/12 21:46:21 Modified:modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java Log: print out a warning if no socket is returned from the pool and we are still connected Revision ChangesPath 1.3 +11 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- PooledSocketSender.java 13 Jan 2004 04:22:28 - 1.2 +++ PooledSocketSender.java 13 Jan 2004 05:46:21 - 1.3 @@ -86,9 +86,9 @@ private InetAddress address; private int port; private Socket sc = null; -private boolean isSocketConnected = false; +private boolean isSocketConnected = true; private boolean suspect; -private long ackTimeout = 150*1000; //15 seconds socket read timeout (for acknowledgement) +private long ackTimeout = 15*1000; //15 seconds socket read timeout (for acknowledgement) private long keepAliveTimeout = 60*1000; //keep socket open for no more than one min private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting private long keepAliveConnectTime = 0; @@ -118,11 +118,13 @@ { //do nothing, happens in the socket sender itself senderQueue.open(); +isSocketConnected = true; } public void disconnect() { senderQueue.close(); +isSocketConnected = false; } public boolean isConnected() @@ -156,6 +158,10 @@ { //get a socket sender from the pool SocketSender sender = senderQueue.getSender(0); +if ( sender == null isConnected() ) { +log.warn(No socket sender available for client=+this.getAddress()+:+this.getPort()); +return; +}//end if //send the message sender.sendMessage(sessionId,data); //return the connection to the pool - To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java IDataSenderFactory.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java ThreadPool.java
fhanik 2004/01/09 15:24:09 Modified:modules/cluster/src/share/org/apache/catalina/cluster/io ObjectReader.java XByteBuffer.java modules/cluster/src/share/org/apache/catalina/cluster/session SimpleTcpReplicationManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp IDataSenderFactory.java SimpleTcpCluster.java SocketSender.java TcpReplicationThread.java ThreadPool.java Added: modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java Log: Implemented socket pool for replication since the synchronized send became a bottleneck. This is a dramatic performance improvement Revision ChangesPath 1.4 +9 -8 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Index: ObjectReader.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- ObjectReader.java 19 Dec 2003 21:22:13 - 1.3 +++ ObjectReader.java 9 Jan 2004 23:24:08 - 1.4 @@ -105,6 +105,11 @@ public int append(byte[] data,int off,int len) throws java.io.IOException { boolean result = false; buffer.append(data,off,len); +int pkgCnt = buffer.countPackages(); +return pkgCnt; +} + +public int execute() throws java.io.IOException { int pkgCnt = 0; boolean pkgExists = buffer.doesPackageExist(); while ( pkgExists ) { @@ -114,10 +119,6 @@ pkgExists = buffer.doesPackageExist(); }//end if return pkgCnt; -} - -public int execute() throws java.io.IOException { -return append(new byte[0],0,0); } public int write(ByteBuffer buf) 1.5 +66 -24 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Index: XByteBuffer.java === RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- XByteBuffer.java 20 Dec 2003 00:48:52 - 1.4 +++ XByteBuffer.java 9 Jan 2004 23:24:08 - 1.5 @@ -180,24 +180,35 @@ * within the buffer * @return - true if a complete package (header,size,data,footer) exists within the buffer */ -protected int packageExists() +public int countPackages() { +int cnt = 0; int pos = START_DATA.length; -//first check start header -int index = this.firstIndexOf(buf,0,START_DATA); -//if the header (START_DATA) isn't the first thing or -//the buffer isn't even 10 bytes -if ( index != 0 || (bufSize10) ) return 0; -//then get the size 4 bytes -int size = toInt(buf,pos); -//now the total buffer has to be long enough to hold -//START_DATA.length+4+size+END_DATA.length -pos = START_DATA.length+4+size; -if ( (pos+END_DATA.length) bufSize ) return 0; -//and finally check the footer of the package END_DATA -int newpos = firstIndexOf(buf,pos,END_DATA); -if ( newpos != pos ) return 0; -return size; +int start = 0; + +while ( start bufSize ) { +//first check start header +int index = this.firstIndexOf(buf,start,START_DATA); +//if the header (START_DATA) isn't the first thing or +//the buffer isn't even 10 bytes +if ( index != start || ((bufSize-start)10) ) break; +//then get the size 4 bytes +int size = toInt(buf, pos); +//now the total buffer has to be long enough to hold +//START_DATA.length+4+size+END_DATA.length +pos = start + START_DATA.length + 4 + size; +if ( (pos + END_DATA.length) bufSize) break; +//and finally check the footer of the package END_DATA +int newpos = firstIndexOf(buf, pos, END_DATA); +//mismatch, there is no package +if (newpos != pos) break; +//increase the packet count +cnt++; +//reset the values +start = pos + END_DATA.length; +pos = start + START_DATA.length; +}//while +return cnt; }//getSize /** @@ -205,7 +216,7 @@ * @return - true if a complete package (header,size,data,footer)