fhanik      2003/12/19 13:22:14

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/io
                        Jdk13ObjectReader.java ObjectReader.java
                        XByteBuffer.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationTransmitter.java SocketSender.java
                        TcpReplicationThread.java
  Log:
  implemented compression of the data sent over the wire
  
  Revision  Changes    Path
  1.2       +5 -5      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java
  
  Index: Jdk13ObjectReader.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Jdk13ObjectReader.java    18 Dec 2003 04:20:14 -0000      1.1
  +++ Jdk13ObjectReader.java    19 Dec 2003 21:22:13 -0000      1.2
  @@ -93,7 +93,7 @@
           this.buffer = new XByteBuffer();
       }
   
  -    public int append(byte[] data,int off,int len) {
  +    public int append(byte[] data,int off,int len) throws java.io.IOException {
           boolean result = false;
           buffer.append(data,off,len);
           int pkgCnt = 0;
  @@ -107,7 +107,7 @@
           return pkgCnt;
       }
   
  -    public int execute() {
  +    public int execute() throws java.io.IOException {
           return append(new byte[0],0,0);
       }
   
  
  
  
  1.3       +6 -6      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
  
  Index: ObjectReader.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ObjectReader.java 18 Dec 2003 04:20:14 -0000      1.2
  +++ ObjectReader.java 19 Dec 2003 21:22:13 -0000      1.3
  @@ -102,7 +102,7 @@
           return this.channel;
       }
   
  -    public int append(byte[] data,int off,int len) {
  +    public int append(byte[] data,int off,int len) throws java.io.IOException {
           boolean result = false;
           buffer.append(data,off,len);
           int pkgCnt = 0;
  @@ -116,7 +116,7 @@
           return pkgCnt;
       }
   
  -    public int execute() {
  +    public int execute() throws java.io.IOException {
           return append(new byte[0],0,0);
       }
   
  
  
  
  1.3       +28 -9     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
  
  Index: XByteBuffer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- XByteBuffer.java  16 Nov 2003 22:22:45 -0000      1.2
  +++ XByteBuffer.java  19 Dec 2003 21:22:13 -0000      1.3
  @@ -214,16 +214,29 @@
        * @param clearFromBuffer - if true, the package will be removed from the byte 
buffer
        * @return - returns the actual message bytes (header, size and footer not 
included).
        */
  -    public synchronized byte[] extractPackage(boolean clearFromBuffer) {
  +    public synchronized byte[] extractPackage(boolean clearFromBuffer) throws 
java.io.IOException {
           int size = packageExists();
           if ( size == 0 ) throw new java.lang.IllegalStateException("No package 
exists in XByteBuffer");
  -        byte[] result = new byte[size];
  -        System.arraycopy(buf,START_DATA.length+4,result,0,size);
  +        byte[] data = new byte[size];
  +        System.arraycopy(buf,START_DATA.length+4,data,0,size);
           if ( clearFromBuffer ) {
               int totalsize = START_DATA.length + 4 + size + END_DATA.length;
               bufSize = bufSize - totalsize;
               System.arraycopy(buf, totalsize, buf, 0, bufSize);
           }
  +        java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(data);
  +        java.util.zip.GZIPInputStream gin = new java.util.zip.GZIPInputStream(bin);
  +        byte[] tmp = new byte[1024];
  +        byte[] result = new byte[0];
  +        int length = gin.read(tmp);
  +        while ( length > 0 ) {
  +            byte[] tmpdata = result;
  +            result = new byte[result.length+length];
  +            System.arraycopy(tmpdata,0,result,0,tmpdata.length);
  +            System.arraycopy(tmp,0,result,tmpdata.length,length);
  +            length = gin.read(tmp);
  +        }
  +        gin.close();
           return result;
       }//extractPackage
   
  @@ -354,7 +367,13 @@
        * @param data - the message data to be contained within the package
        * @return - a full package (header,size,data,footer)
        */
  -    public static byte[] createDataPackage(byte[] data)  {
  +    public static byte[] createDataPackage(byte[] indata) throws 
java.io.IOException  {
  +        java.io.ByteArrayOutputStream bout = new 
java.io.ByteArrayOutputStream(indata.length/2);
  +        java.util.zip.GZIPOutputStream gout = new 
java.util.zip.GZIPOutputStream(bout);
  +        gout.write(indata);
  +        gout.flush();
  +        gout.close();
  +        byte[] data = bout.toByteArray();
           byte[] result = new byte[START_DATA.length+4+data.length+END_DATA.length];
           System.arraycopy(START_DATA,0,result,0,START_DATA.length);
           System.arraycopy(toBytes(data.length),0,result,START_DATA.length,4);
  @@ -378,4 +397,4 @@
          System.out.println("After=" + toLong(d, 0));
       }
   
  -}//class
  \ No newline at end of file
  +}//class
  
  
  
  1.9       +16 -3     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- ReplicationTransmitter.java       18 Dec 2003 04:20:15 -0000      1.8
  +++ ReplicationTransmitter.java       19 Dec 2003 21:22:13 -0000      1.9
  @@ -77,6 +77,18 @@
           for ( int i=0; i<senders.length; i++)
               
map.put(senders[i].getAddress().getHostAddress()+":"+senders[i].getPort(),senders[i]);
       }
  +
  +    private static long nrOfRequests = 0;
  +    private static long totalBytes = 0;
  +    private static synchronized void addStats(int length) {
  +        nrOfRequests++;
  +        totalBytes+=length;
  +        if ( (nrOfRequests % 100) == 0 ) {
  +            log.info("Nr of bytes sent="+totalBytes+" over "+nrOfRequests+" 
=="+(totalBytes/nrOfRequests)+" bytes/request");
  +        }
  +
  +    }
  +
       public synchronized void add(IDataSender sender)
       {
           String key = sender.getAddress().getHostAddress()+":"+sender.getPort();
  @@ -132,6 +144,7 @@
               if (!sender.isConnected())
                   sender.connect();
               sender.sendMessage(sessionId,data);
  +            addStats(data.length);
               sender.setSuspect(false);
           }catch ( Exception x)
           {
  
  
  
  1.7       +4 -4      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
  
  Index: SocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SocketSender.java 18 Dec 2003 04:20:15 -0000      1.6
  +++ SocketSender.java 19 Dec 2003 21:22:13 -0000      1.7
  @@ -108,7 +108,7 @@
       public void connect() throws java.io.IOException
       {
           sc = new Socket(getAddress(),getPort());
  -        sc.setSoTimeout((int)ackTimeout);
  +        //sc.setSoTimeout((int)ackTimeout);
           isSocketConnected = true;
           this.keepAliveCount = 0;
           this.keepAliveConnectTime = System.currentTimeMillis();
  
  
  
  1.4       +4 -6      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
  
  Index: TcpReplicationThread.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- TcpReplicationThread.java 18 Dec 2003 04:20:15 -0000      1.3
  +++ TcpReplicationThread.java 19 Dec 2003 21:22:13 -0000      1.4
  @@ -197,8 +197,6 @@
   
       private void sendAck(SelectionKey key, SocketChannel channel) throws 
java.io.IOException {
           //send a reply-acknowledgement
  -        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(new byte[] {6,2,3});
  -        channel.write(buf);
  -        buf.clear();
  +        channel.write(ByteBuffer.wrap(new byte[] {6,2,3}));
       }
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to