cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp TcpReplicationThread.java

2005-03-25 Thread pero
pero2005/03/25 14:18:38

  Modified:modules/cluster/src/share/org/apache/catalina/cluster/tcp
TcpReplicationThread.java
  Log:
  use constant ACK byte array instead create new 3 byte buffer for every 
message ack
  better waitAck Handling
  
  Revision  ChangesPath
  1.14  +30 -22
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- TcpReplicationThread.java 15 Feb 2005 09:31:45 -  1.13
  +++ TcpReplicationThread.java 25 Mar 2005 22:18:38 -  1.14
  @@ -15,26 +15,28 @@
*/
   
   package org.apache.catalina.cluster.tcp;
  +import java.io.IOException;
   import java.nio.ByteBuffer;
   import java.nio.channels.SelectionKey;
  -import java.io.IOException;
   import java.nio.channels.SocketChannel;
  +
   import org.apache.catalina.cluster.io.ObjectReader;
   
   /**
  - * A worker thread class which can drain channels and echo-back
  - * the input.  Each instance is constructed with a reference to
  - * the owning thread pool object. When started, the thread loops
  - * forever waiting to be awakened to service the channel associated
  - * with a SelectionKey object.
  - * The worker is tasked by calling its serviceChannel() method
  - * with a SelectionKey object.  The serviceChannel() method stores
  - * the key reference in the thread object then calls notify()
  - * to wake it up.  When the channel has been drained, the worker
  - * thread returns itself to its parent pool.
  - */
  -public class TcpReplicationThread extends WorkerThread
  -{
  + * A worker thread class which can drain channels and echo-back the input. 
Each
  + * instance is constructed with a reference to the owning thread pool object.
  + * When started, the thread loops forever waiting to be awakened to service 
the
  + * channel associated with a SelectionKey object. The worker is tasked by
  + * calling its serviceChannel() method with a SelectionKey object. The
  + * serviceChannel() method stores the key reference in the thread object then
  + * calls notify() to wake it up. When the channel has been drained, the 
worker
  + * thread returns itself to its parent pool.
  + * 
  + * @author Filip Hanik
  + * @version $Revision$, $Date$
  + */
  +public class TcpReplicationThread extends WorkerThread {
  +private static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
   private static org.apache.commons.logging.Log log =
   org.apache.commons.logging.LogFactory.getLog( 
TcpReplicationThread.class );
   private ByteBuffer buffer = ByteBuffer.allocate (1024);
  @@ -53,7 +55,8 @@
   // sleep and release object lock
   this.wait();
   } catch (InterruptedException e) {
  -log.info(TCP worker thread interrupted in cluster,e);
  +if(log.isInfoEnabled())
  +log.info(TCP worker thread interrupted in cluster,e);
   // clear interrupt status
   Thread.interrupted();
   }
  @@ -119,16 +122,16 @@
   // loop while data available, channel is non-blocking
   while ((count = channel.read (buffer))  0) {
   buffer.flip();   // make buffer readable
  -int pkgcnt = reader.append(buffer.array(),0,count);
  +reader.append(buffer.array(),0,count);
   buffer.clear();  // make buffer empty
   }
   //check to see if any data is available
   int pkgcnt = reader.execute();
  -while ( pkgcnt  0 ) {
  -if (waitForAck) {
  +if (waitForAck) {
  +while ( pkgcnt  0 ) {
   sendAck(key,channel);
  -} //end if
  -pkgcnt--;
  +pkgcnt--;
  +}
   }
   
   if (count  0) {
  @@ -149,10 +152,15 @@
   
   }
   
  +/**
  + * send a reply-acknowledgement (6,2,3)
  + * @param key
  + * @param channel
  + */
   private void sendAck(SelectionKey key, SocketChannel channel) {
  -//send a reply-acknowledgement
  +
   try {
  -channel.write(ByteBuffer.wrap(new byte[] {6, 2, 3}));
  +channel.write(ByteBuffer.wrap(ACK_COMMAND));
   } catch ( java.io.IOException x ) {
   log.warn(Unable to send ACK back through channel, channel 
disconnected?: +x.getMessage());
   }
  
  
  

-
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional 

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp TcpReplicationThread.java

2004-04-15 Thread fhanik
fhanik  2004/04/15 07:12:37

  Modified:modules/cluster/src/share/org/apache/catalina/cluster/tcp
TcpReplicationThread.java
  Log:
  Added all the interestOps to the synchronized blocks to guarantee non blocking 
behaviour at all times. Thanks to Rainer Jung for this patch.
  
  Revision  ChangesPath
  1.11  +3 -3  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- TcpReplicationThread.java 7 Apr 2004 19:02:31 -   1.10
  +++ TcpReplicationThread.java 15 Apr 2004 14:12:37 -  1.11
  @@ -138,14 +138,14 @@
   channel.close();
   return;
   }
  -// resume interest in OP_READ, OP_WRITE
  -int resumeOps = key.interestOps() | SelectionKey.OP_READ;
  -long _debugstart = System.currentTimeMillis();
  +
   //acquire the interestOps mutex
   Object mutex = this.getPool().getInterestOpsMutex();
   synchronized (mutex) {
   // cycle the selector so this key is active again
   key.selector().wakeup();
  +// resume interest in OP_READ, OP_WRITE
  +int resumeOps = key.interestOps() | SelectionKey.OP_READ;
   key.interestOps(resumeOps);
   }
   
  
  
  

-
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]