fhanik 2003/12/19 13:22:14 Modified: modules/cluster/src/share/org/apache/catalina/cluster/io Jdk13ObjectReader.java ObjectReader.java XByteBuffer.java modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java SocketSender.java TcpReplicationThread.java Log: implemented compression of the data sent over the wire Revision Changes Path 1.2 +5 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java Index: Jdk13ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- Jdk13ObjectReader.java 18 Dec 2003 04:20:14 -0000 1.1 +++ Jdk13ObjectReader.java 19 Dec 2003 21:22:13 -0000 1.2 @@ -93,7 +93,7 @@ this.buffer = new XByteBuffer(); } - public int append(byte[] data,int off,int len) { + public int append(byte[] data,int off,int len) throws java.io.IOException { boolean result = false; buffer.append(data,off,len); int pkgCnt = 0; @@ -107,7 +107,7 @@ return pkgCnt; } - public int execute() { + public int execute() throws java.io.IOException { return append(new byte[0],0,0); } 1.3 +6 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Index: ObjectReader.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- ObjectReader.java 18 Dec 2003 04:20:14 -0000 1.2 +++ ObjectReader.java 19 Dec 2003 21:22:13 -0000 1.3 @@ -102,7 +102,7 @@ return this.channel; } - public int append(byte[] data,int off,int len) { + public int append(byte[] data,int off,int len) throws java.io.IOException { boolean result = false; buffer.append(data,off,len); int pkgCnt = 0; @@ -116,7 +116,7 @@ return pkgCnt; } - public int execute() { + public int execute() throws java.io.IOException { return append(new byte[0],0,0); } 1.3 +28 -9 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Index: XByteBuffer.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- XByteBuffer.java 16 Nov 2003 22:22:45 -0000 1.2 +++ XByteBuffer.java 19 Dec 2003 21:22:13 -0000 1.3 @@ -214,16 +214,29 @@ * @param clearFromBuffer - if true, the package will be removed from the byte buffer * @return - returns the actual message bytes (header, size and footer not included). */ - public synchronized byte[] extractPackage(boolean clearFromBuffer) { + public synchronized byte[] extractPackage(boolean clearFromBuffer) throws java.io.IOException { int size = packageExists(); if ( size == 0 ) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); - byte[] result = new byte[size]; - System.arraycopy(buf,START_DATA.length+4,result,0,size); + byte[] data = new byte[size]; + System.arraycopy(buf,START_DATA.length+4,data,0,size); if ( clearFromBuffer ) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; bufSize = bufSize - totalsize; System.arraycopy(buf, totalsize, buf, 0, bufSize); } + java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(data); + java.util.zip.GZIPInputStream gin = new java.util.zip.GZIPInputStream(bin); + byte[] tmp = new byte[1024]; + byte[] result = new byte[0]; + int length = gin.read(tmp); + while ( length > 0 ) { + byte[] tmpdata = result; + result = new byte[result.length+length]; + System.arraycopy(tmpdata,0,result,0,tmpdata.length); + System.arraycopy(tmp,0,result,tmpdata.length,length); + length = gin.read(tmp); + } + gin.close(); return result; }//extractPackage @@ -354,7 +367,13 @@ * @param data - the message data to be contained within the package * @return - a full package (header,size,data,footer) */ - public static byte[] createDataPackage(byte[] data) { + public static byte[] createDataPackage(byte[] indata) throws java.io.IOException { + java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(indata.length/2); + java.util.zip.GZIPOutputStream gout = new java.util.zip.GZIPOutputStream(bout); + gout.write(indata); + gout.flush(); + gout.close(); + byte[] data = bout.toByteArray(); byte[] result = new byte[START_DATA.length+4+data.length+END_DATA.length]; System.arraycopy(START_DATA,0,result,0,START_DATA.length); System.arraycopy(toBytes(data.length),0,result,START_DATA.length,4); @@ -378,4 +397,4 @@ System.out.println("After=" + toLong(d, 0)); } -}//class \ No newline at end of file +}//class 1.9 +16 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- ReplicationTransmitter.java 18 Dec 2003 04:20:15 -0000 1.8 +++ ReplicationTransmitter.java 19 Dec 2003 21:22:13 -0000 1.9 @@ -77,6 +77,18 @@ for ( int i=0; i<senders.length; i++) map.put(senders[i].getAddress().getHostAddress()+":"+senders[i].getPort(),senders[i]); } + + private static long nrOfRequests = 0; + private static long totalBytes = 0; + private static synchronized void addStats(int length) { + nrOfRequests++; + totalBytes+=length; + if ( (nrOfRequests % 100) == 0 ) { + log.info("Nr of bytes sent="+totalBytes+" over "+nrOfRequests+" =="+(totalBytes/nrOfRequests)+" bytes/request"); + } + + } + public synchronized void add(IDataSender sender) { String key = sender.getAddress().getHostAddress()+":"+sender.getPort(); @@ -132,6 +144,7 @@ if (!sender.isConnected()) sender.connect(); sender.sendMessage(sessionId,data); + addStats(data.length); sender.setSuspect(false); }catch ( Exception x) { 1.7 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java Index: SocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SocketSender.java 18 Dec 2003 04:20:15 -0000 1.6 +++ SocketSender.java 19 Dec 2003 21:22:13 -0000 1.7 @@ -108,7 +108,7 @@ public void connect() throws java.io.IOException { sc = new Socket(getAddress(),getPort()); - sc.setSoTimeout((int)ackTimeout); + //sc.setSoTimeout((int)ackTimeout); isSocketConnected = true; this.keepAliveCount = 0; this.keepAliveConnectTime = System.currentTimeMillis(); 1.4 +4 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Index: TcpReplicationThread.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- TcpReplicationThread.java 18 Dec 2003 04:20:15 -0000 1.3 +++ TcpReplicationThread.java 19 Dec 2003 21:22:13 -0000 1.4 @@ -197,8 +197,6 @@ private void sendAck(SelectionKey key, SocketChannel channel) throws java.io.IOException { //send a reply-acknowledgement - java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(new byte[] {6,2,3}); - channel.write(buf); - buf.clear(); + channel.write(ByteBuffer.wrap(new byte[] {6,2,3})); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]