pero 2005/04/12 11:56:07
Modified: modules/cluster/src/share/org/apache/catalina/cluster
ClusterSender.java
modules/cluster/src/share/org/apache/catalina/cluster/io
XByteBuffer.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
DataSender.java FastAsyncSocketSender.java
ReplicationTransmitter.java SimpleTcpCluster.java
TcpReplicationThread.java mbeans-descriptors.xml
modules/cluster to-do.txt
Added: modules/cluster/test/src/share/org/apache/catalina/cluster/tcp
ReplicationTransmitterTest.java
Log:
Optimize cluster send message
add some usefull mbean attributes
Refactor ReplicationTransmitter
Revision Changes Path
1.7 +2 -2
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java
Index: ClusterSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSender.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- ClusterSender.java 10 Apr 2005 16:20:45 -0000 1.6
+++ ClusterSender.java 12 Apr 2005 18:56:07 -0000 1.7
@@ -39,9 +39,9 @@
public void backgroundProcess() ;
- public void sendMessage(String messageId, byte[] indata, Member member)
throws java.io.IOException;
+ public void sendMessage(ClusterMessage message, Member member) throws
java.io.IOException;
- public void sendMessage(String messageId, byte[] indata) throws
java.io.IOException;
+ public void sendMessage(ClusterMessage message) throws
java.io.IOException;
public boolean isWaitForAck();
public void setWaitForAck(boolean isWaitForAck);
1.13 +2 -1
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
Index: XByteBuffer.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- XByteBuffer.java 5 Apr 2005 18:05:52 -0000 1.12
+++ XByteBuffer.java 12 Apr 2005 18:56:07 -0000 1.13
@@ -366,6 +366,7 @@
* @param indata - the message data to be contained within the package
* @param compress - compress message data or not
* @return - a full package (header,size,data,footer)
+ * @deprecated since 5.5.10
*/
public static byte[] createDataPackage(byte[] indata, boolean compress)
throws java.io.IOException {
1.7 +18 -6
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
Index: DataSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- DataSender.java 10 Apr 2005 16:20:46 -0000 1.6
+++ DataSender.java 12 Apr 2005 18:56:07 -0000 1.7
@@ -17,10 +17,12 @@
package org.apache.catalina.cluster.tcp;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
+import org.apache.catalina.cluster.io.XByteBuffer;
import org.apache.catalina.util.StringManager;
/**
@@ -47,7 +49,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "DataSender/1.4";
+ private static final String info = "DataSender/2.0";
/**
* receiver address
@@ -227,6 +229,13 @@
}
/**
+ * @return Returns the avg totalBytes/nrOfRequests.
+ */
+ public double getAvgMessageSize() {
+ return ((double)totalBytes) / nrOfRequests;
+ }
+
+ /**
* @return Returns the avg processingTime/nrOfRequests.
*/
public double getAvgProcessingTime() {
@@ -693,7 +702,7 @@
openSocket();
}
try {
- writeData(data);
+ writeData(data);
} catch (java.io.IOException x) {
// second try with fresh connection
dataResendCounter++;
@@ -724,8 +733,12 @@
* @throws IOException
*/
protected void writeData(byte[] data) throws IOException {
- socket.getOutputStream().write(data);
- socket.getOutputStream().flush();
+ OutputStream out = socket.getOutputStream();
+ out.write(XByteBuffer.START_DATA);
+ out.write(XByteBuffer.toBytes(data.length));
+ out.write(data);
+ out.write(XByteBuffer.END_DATA);
+ out.flush();
if (isWaitForAck())
waitForAck(ackTimeout);
@@ -733,7 +746,6 @@
/**
* Wait for Acknowledgement from other server
- * FIXME Handle SocketTimeoutException - Retry message ?
* @param timeout
* @throws java.io.IOException
* @throws java.net.SocketTimeoutException
1.5 +51 -28
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
Index: FastAsyncSocketSender.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- FastAsyncSocketSender.java 10 Apr 2005 16:54:06 -0000 1.4
+++ FastAsyncSocketSender.java 12 Apr 2005 18:56:07 -0000 1.5
@@ -54,7 +54,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "FastAsyncSocketSender/1.1";
+ private static final String info = "FastAsyncSocketSender/2.0";
// ----------------------------------------------------- Instance
Variables
@@ -109,6 +109,7 @@
}
+
/**
* get current add wait timeout
* @return current wait timeout
@@ -386,6 +387,8 @@
*/
private long queuedNrOfBytes = 0;
+
+
/**
* Only use inside FastAsyncSocketSender
* @param sender
@@ -396,9 +399,12 @@
this.queue = queue;
this.sender = sender;
}
-
- protected long getQueuedNrOfBytes() {
- return queuedNrOfBytes ;
+
+ /**
+ * @return Returns the queuedNrOfBytes.
+ */
+ public long getQueuedNrOfBytes() {
+ return queuedNrOfBytes;
}
protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes)
{
@@ -417,35 +423,17 @@
public void stopRunning() {
keepRunning = false;
}
-
+
+
/* Get the objects from queue and send all mesages to the sender.
* @see java.lang.Runnable#run()
*/
public void run() {
while (keepRunning) {
- // get a link list of all queued objects
- if(log.isTraceEnabled())
- log.trace("Queuesize before=" +
((FastQueue)queue).getSize());
- LinkObject entry = queue.remove();
- if(log.isTraceEnabled())
- log.trace("Queuesize after=" + ((FastQueue)queue).getSize());
+ long queueSize;
+ LinkObject entry = getQueuedMessage();
if (entry != null) {
- do {
- int messagesize = 0;
- try {
- byte[] data = (byte[]) entry.data();
- messagesize = data.length;
- sender.pushMessage((String) entry.getKey(),
data);
- outQueueCounter++;
- } catch (Exception x) {
- log.warn(sm.getString(
- "AsyncSocketSender.send.error", entry
- .getKey()),x);
- } finally {
- decQueuedNrOfBytes(messagesize);
- }
- entry = entry.next();
- } while (entry != null);
+ pushQueuedMessages(entry);
} else {
if (keepRunning) {
log.warn(sm.getString("AsyncSocketSender.queue.empty",
@@ -456,6 +444,41 @@
}
}
+ /**
+ * @return
+ */
+ protected LinkObject getQueuedMessage() {
+ // get a link list of all queued objects
+ if (log.isTraceEnabled())
+ log.trace("Queuesize before=" + ((FastQueue)
queue).getSize());
+ LinkObject entry = queue.remove();
+ if (log.isTraceEnabled())
+ log.trace("Queuesize after=" + ((FastQueue)
queue).getSize());
+ return entry;
+ }
+
+ /**
+ * @param entry
+ */
+ protected void pushQueuedMessages(LinkObject entry) {
+ do {
+ int messagesize = 0;
+ try {
+ byte[] data = (byte[]) entry.data();
+ messagesize = data.length;
+ sender.pushMessage((String) entry.getKey(), data);
+ outQueueCounter++;
+ } catch (Exception x) {
+ log.warn(sm.getString(
+ "AsyncSocketSender.send.error", entry
+ .getKey()), x);
+ } finally {
+ decQueuedNrOfBytes(messagesize);
+ }
+ entry = entry.next();
+ } while (entry != null);
+ }
+
}
}
\ No newline at end of file
1.25 +154 -38
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -r1.24 -r1.25
--- ReplicationTransmitter.java 10 Apr 2005 16:20:46 -0000 1.24
+++ ReplicationTransmitter.java 12 Apr 2005 18:56:07 -0000 1.25
@@ -16,14 +16,18 @@
package org.apache.catalina.cluster.tcp;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.zip.GZIPOutputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.catalina.cluster.ClusterMessage;
import org.apache.catalina.cluster.ClusterSender;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.io.XByteBuffer;
@@ -49,7 +53,7 @@
/**
* The descriptive information about this implementation.
*/
- private static final String info = "ReplicationTransmitter/1.3";
+ private static final String info = "ReplicationTransmitter/2.0";
/**
* The string manager for this package.
@@ -109,9 +113,29 @@
/**
* Compress message data bytes
*/
- private boolean compress = true;
+ private boolean compress = false;
/**
+ * doTransmitterProcessingStats
+ */
+ protected boolean doTransmitterProcessingStats = false;
+
+ /**
+ * proessingTime
+ */
+ protected long processingTime = 0;
+
+ /**
+ * min proessingTime
+ */
+ protected long minProcessingTime = Long.MAX_VALUE ;
+
+ /**
+ * max proessingTime
+ */
+ protected long maxProcessingTime = 0;
+
+ /**
* dynamic sender <code>properties</code>
*/
private Map properties = new HashMap();
@@ -187,6 +211,49 @@
}
/**
+ * @return Returns the avg processingTime/nrOfRequests.
+ */
+ public double getAvgProcessingTime() {
+ return ((double)processingTime) / nrOfRequests;
+ }
+
+ /**
+ * @return Returns the maxProcessingTime.
+ */
+ public long getMaxProcessingTime() {
+ return maxProcessingTime;
+ }
+
+ /**
+ * @return Returns the minProcessingTime.
+ */
+ public long getMinProcessingTime() {
+ return minProcessingTime;
+ }
+
+ /**
+ * @return Returns the processingTime.
+ */
+ public long getProcessingTime() {
+ return processingTime;
+ }
+
+ /**
+ * @return Returns the doTransmitterProcessingStats.
+ */
+ public boolean isDoTransmitterProcessingStats() {
+ return doTransmitterProcessingStats;
+ }
+
+ /**
+ * @param doTransmitterProcessingStats The doTransmitterProcessingStats
to set.
+ */
+ public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
+ this.doTransmitterProcessingStats = doProcessingStats;
+ }
+
+
+ /**
* Transmitter ObjectName
*
* @param name
@@ -344,42 +411,59 @@
}
// ------------------------------------------------------------- public
-
+
/**
* Send data to one member
- *
- * @see
org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
- * byte[], org.apache.catalina.cluster.Member)
+ * FIXME set filtering messages
+ * @see
org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage,
org.apache.catalina.cluster.Member)
*/
- public void sendMessage(String sessionId, byte[] indata, Member member)
- throws java.io.IOException {
- byte[] data = convertSenderData(indata);
- String key = getKey(member);
- IDataSender sender = (IDataSender) map.get(key);
- sendMessageData(sessionId, data, sender);
+ public void sendMessage(ClusterMessage message, Member member)
+ throws java.io.IOException {
+ long time = 0 ;
+ if(doTransmitterProcessingStats) {
+ time = System.currentTimeMillis();
+ }
+ try {
+ byte[] data = createMessageData(message);
+ String key = getKey(member);
+ IDataSender sender = (IDataSender) map.get(key);
+ sendMessageData(message.getUniqueId(), data, sender);
+ } finally {
+ if (doTransmitterProcessingStats) {
+ addProcessingStats(time);
+ }
+ }
}
/**
* send message to all senders (broadcast)
- *
- * @see
org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
- * byte[])
+ * @see
org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage)
*/
- public void sendMessage(String sessionId, byte[] indata)
+ public void sendMessage(ClusterMessage message)
throws java.io.IOException {
- IDataSender[] senders = getSenders();
- byte[] data = convertSenderData(indata);
- for (int i = 0; i < senders.length; i++) {
-
- IDataSender sender = senders[i];
- try {
- sendMessageData(sessionId, data, sender);
- } catch (Exception x) {
-
- if (!sender.getSuspect())
- log.warn("Unable to send replicated message to " + sender
- + ", is server down?", x);
- sender.setSuspect(true);
+ long time = 0;
+ if (doTransmitterProcessingStats) {
+ time = System.currentTimeMillis();
+ }
+ try {
+ byte[] data = createMessageData(message);
+ IDataSender[] senders = getSenders();
+ for (int i = 0; i < senders.length; i++) {
+
+ IDataSender sender = senders[i];
+ try {
+ sendMessageData(message.getUniqueId(), data, sender);
+ } catch (Exception x) {
+ if (!sender.getSuspect()) {
+ log.warn("Unable to send replicated message to "
+ + sender + ", is server down?", x);
+ sender.setSuspect(true);
+ }
+ }
+ }
+ } finally {
+ if (doTransmitterProcessingStats) {
+ addProcessingStats(time);
}
}
}
@@ -518,6 +602,9 @@
nrOfRequests = 0;
totalBytes = 0;
failureCounter = 0;
+ processingTime = 0;
+ minProcessingTime = Long.MAX_VALUE;
+ maxProcessingTime = 0;
}
/*
@@ -640,6 +727,7 @@
}
}
+
/**
* build sender ObjectName (
*
engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port"
)
@@ -664,17 +752,32 @@
}
/**
- * compress data
- *
+ * Send Message create Timestamp and generate message bytes form msg
* @see XByteBuffer#createDataPackage(byte[])
- * @param indata
- * @return
+ * @param msg cluster message
+ * @return cluster message as byte array
* @throws IOException
- * FIXME get CompressMessageDate from cluster instanz
*/
- protected byte[] convertSenderData(byte[] data) throws IOException {
- return XByteBuffer.createDataPackage(data, isCompress());
+ protected byte[] createMessageData(ClusterMessage msg) throws
IOException {
+ msg.setTimestamp(System.currentTimeMillis());
+ ByteArrayOutputStream outs = new ByteArrayOutputStream();
+ ObjectOutputStream out;
+ GZIPOutputStream gout = null;
+ if (isCompress()) {
+ gout = new GZIPOutputStream(outs);
+ out = new ObjectOutputStream(gout);
+ } else {
+ out = new ObjectOutputStream(outs);
+ }
+ out.writeObject(msg);
+ // flush out the gzip stream to byte buffer
+ if(gout != null) {
+ gout.flush();
+ gout.close();
+ }
+ return outs.toByteArray();
}
+
/**
* Send message to concrete sender. If autoConnect is true, check is
@@ -723,5 +826,18 @@
}
}
-
+ /**
+ * Add processing stats times
+ * @param startTime
+ */
+ protected void addProcessingStats(long startTime) {
+ long time = System.currentTimeMillis() - startTime ;
+ if(time < minProcessingTime)
+ minProcessingTime = time ;
+ if( time > maxProcessingTime)
+ maxProcessingTime = time ;
+ processingTime += time ;
+ }
+
+
}
\ No newline at end of file
1.62 +20 -51
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
Index: SimpleTcpCluster.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
retrieving revision 1.61
retrieving revision 1.62
diff -u -r1.61 -r1.62
--- SimpleTcpCluster.java 10 Apr 2005 16:20:46 -0000 1.61
+++ SimpleTcpCluster.java 12 Apr 2005 18:56:07 -0000 1.62
@@ -91,7 +91,7 @@
/**
* Descriptive information about this component implementation.
*/
- protected static final String info = "SimpleTcpCluster/1.2";
+ protected static final String info = "SimpleTcpCluster/2.0";
public static final String BEFORE_MEMBERREGISTER_EVENT =
"before_member_register";
@@ -598,8 +598,16 @@
}
/**
- * send a cluster message to one member
+ * send message to all cluster members
*
+ * @see
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
+ */
+ public void send(ClusterMessage msg) {
+ send(msg, null);
+ }
+
+ /**
+ * send a cluster message to one member
* @param msg message to transfer
* @param dest Receiver member
* @see
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage,
@@ -608,63 +616,24 @@
public void send(ClusterMessage msg, Member dest) {
try {
msg.setAddress(membershipService.getLocalMember());
- Member destination = dest;
-
- if (msg instanceof SessionMessage) {
- SessionMessage smsg = (SessionMessage) msg;
- //if we request session state, send to the oldest of members
- if ((destination == null)
- && (smsg.getEventType() ==
SessionMessage.EVT_GET_ALL_SESSIONS)
- && (membershipService.getMembers().length > 0)) {
- destination = membershipService.getMembers()[0];
- }
- }
- byte[] data = createMessageData(msg);
- if (destination != null) {
- Member tcpdest = dest;
- if ((tcpdest != null)
- &&
(!membershipService.getLocalMember().equals(tcpdest))) {
- clusterSender.sendMessage(msg.getUniqueId(), data,
tcpdest);
- }
+ if (dest != null) {
+ if (!membershipService.getLocalMember().equals(dest)) {
+ clusterSender.sendMessage(msg, dest);
+ } else
+ log.error("Unable to send message to local member " +
msg);
} else {
- clusterSender.sendMessage(msg.getUniqueId(), data);
+ clusterSender.sendMessage(msg);
}
} catch (Exception x) {
- if(notifyLifecycleListenerOnFailure) {
+ if (notifyLifecycleListenerOnFailure) {
// Notify our interested LifecycleListeners
- lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT,
- new SendMessageData(msg,dest,x));
+ lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT,
+ new SendMessageData(msg, dest, x));
}
log.error("Unable to send message through cluster sender.", x);
}
}
- /**
- * Send Message create Timestamp and generate message bytes form msg
- * @param msg cluster message
- * @return cluster message as byte array
- * @throws IOException
- */
- protected byte[] createMessageData(ClusterMessage msg) throws
IOException {
- msg.setTimestamp(System.currentTimeMillis());
- java.io.ByteArrayOutputStream outs = new
java.io.ByteArrayOutputStream();
- java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(
- outs);
- out.writeObject(msg);
- byte[] data = outs.toByteArray();
- return data;
- }
-
- /**
- * send message to all cluster members
- *
- * @see
org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
- */
- public void send(ClusterMessage msg) {
- send(msg, null);
- }
-
-
/* New cluster member is registered
* FIXME notify someone (JMX(Listener)
* @see
org.apache.catalina.cluster.MembershipListener#memberAdded(org.apache.catalina.cluster.Member)
1.17 +3 -3
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- TcpReplicationThread.java 10 Apr 2005 16:20:46 -0000 1.16
+++ TcpReplicationThread.java 12 Apr 2005 18:56:07 -0000 1.17
@@ -113,7 +113,7 @@
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
- private void drainChannel (SelectionKey key)
+ protected void drainChannel (SelectionKey key)
throws Exception
{
boolean packetReceived=false;
@@ -162,7 +162,7 @@
* @param key
* @param channel
*/
- private void sendAck(SelectionKey key, SocketChannel channel) {
+ protected void sendAck(SelectionKey key, SocketChannel channel) {
try {
channel.write(ByteBuffer.wrap(ACK_COMMAND));
1.8 +45 -6
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml
Index: mbeans-descriptors.xml
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- mbeans-descriptors.xml 10 Apr 2005 16:20:46 -0000 1.7
+++ mbeans-descriptors.xml 12 Apr 2005 18:56:07 -0000 1.8
@@ -9,7 +9,10 @@
domain="Catalina"
group="Cluster"
type="org.apache.catalina.cluster.tcp.SimpleTcpCluster">
-
+ <attribute name="info"
+ description="Class version info"
+ type="java.lang.String"
+ writeable="false"/>
<attribute name="notifyListenersOnReplication"
description="notify session attribute listener at backups"
type="boolean"/>
@@ -71,7 +74,10 @@
domain="Catalina"
group="ClusterSender"
type="org.apache.catalina.cluster.tcp.ReplicationTransmitter">
-
+ <attribute name="info"
+ description="Class version info"
+ type="java.lang.String"
+ writeable="false"/>
<attribute name="replicationMode"
description="replication mode (synchnous,pooled.asynchnous)"
type="java.lang.String"/>
@@ -87,6 +93,26 @@
is="true"
type="boolean"
writeable="false" />
+ <attribute name="processingTime"
+ description="sending processing time"
+ type="long"
+ writeable="false"/>
+ <attribute name="minProcessingTime"
+ description="minimal sending processing time"
+ type="long"
+ writeable="false"/>
+ <attribute name="avgProcessingTime"
+ description="processing time / nrOfRequests"
+ type="double"
+ writeable="false"/>
+ <attribute name="maxProcessingTime"
+ description="maximal sending processing time"
+ type="long"
+ writeable="false"/>
+ <attribute name="doTransmitterProcessingStats"
+ description="create processing time stats"
+ is="true"
+ type="boolean" />
<attribute name="nrOfRequests"
description="number of send messages to other members"
type="long"
@@ -152,6 +178,10 @@
writeable="false"/>
<attribute name="ackTimeout"
description="acknowledge timeout"
+ type="long"/>
+ <attribute name="avgMessageSize"
+ writeable="false"
+ description="avg message size (totalbytes/nrOfRequests"
type="long"/>
<attribute name="queueSize"
writeable="false"
@@ -318,6 +348,10 @@
<attribute name="ackTimeout"
description="acknowledge timeout"
type="long"/>
+ <attribute name="avgMessageSize"
+ writeable="false"
+ description="avg message size (totalbytes/nrOfRequests"
+ type="long" />
<attribute name="queueSize"
writeable="false"
description="queue size"
@@ -458,7 +492,7 @@
description="queue remove wait time (queue thread waits)"
type="long"
writeable="false"/>
- <operation name="connect"
+ <operation name="connect"
description="connect to other replication node"
impact="ACTION"
returnType="void">
@@ -519,6 +553,10 @@
description="socket connected"
type="boolean"
writeable="false"/>
+ <attribute name="avgMessageSize"
+ writeable="false"
+ description="avg message size (totalbytes/nrOfRequests"
+ type="long"/>
<attribute name="nrOfRequests"
description="number of send messages to other members"
type="long"
@@ -577,9 +615,6 @@
is="true"
type="boolean"
writeable="false" />
- <attribute name="maxPoolSocketLimit"
- description="Max parallel sockets"
- type="int"/>
<attribute name="keepAliveTimeout"
description="active socket keep alive timeout"
type="long"/>
@@ -599,6 +634,10 @@
description="socket connected"
type="boolean"
writeable="false"/>
+ <attribute name="avgMessageSize"
+ writeable="false"
+ description="avg message size (totalbytes/nrOfRequests"
+ type="long"/>
<attribute name="nrOfRequests"
description="number of send messages to other members"
type="long"
1.11 +34 -8 jakarta-tomcat-catalina/modules/cluster/to-do.txt
Index: to-do.txt
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/to-do.txt,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- to-do.txt 10 Apr 2005 16:20:46 -0000 1.10
+++ to-do.txt 12 Apr 2005 18:56:07 -0000 1.11
@@ -1,5 +1,16 @@
-Next Steps:
+==============================
+Next actions
+==============================
+- reduce cpu and memory consume (Receiver)
+ - set new compress sender flag at default=false ( < CPU usage)
+ - Make compact algo
+ currently message receive data is split at
XbyteBuffer#extractPackage and
+ SimpleTcpCluster#messageDataReceived
+- when a lot of messages expire it comes to burst of messages
+ - all 60 Sec when ManagerBase#processExpires is called!
+ - Better is to transfer a spezial Epxire Message with an array of
expired messages.
+ - This reduce message transfer and reduce waits for acks.
- Documentation
wrote a complete new how-to
add example configurations
@@ -18,12 +29,7 @@
detect long wait acks
- Implement fragmentation of large replication objects
Compress at message level
- s. FarmDeployer war handling
-- reduce memory consume
- - set new compress sender flag at default=false
- - don't copy the buffer to add message header
- transfer this from SimpleTcpCluster to DataSender pushMessage
- - make it possible that a subclass cryp the transferd messages
+ Splitting Messages ala FarmDeployer war handling
- add a message type to the message header.
- filtering at receiver that drop message before build Object
- add test cluster project
@@ -31,8 +37,11 @@
automated regression testing with some standard configs
- add support to dynamic property transfer from SimpleTcpCluster to the
Manager
like ReplicationTransmitter
-
+- better restart szenario after failure.
+
+==============================
Nice to have:
+==============================
- Implement a NonSerializable interface for session attributes that do not
wish to be replicated
Then we must have ClusterNonSerializable at common classloader
@@ -71,8 +80,24 @@
Fixed!
Last FileMessage fragment need longe ackTimeout
<Cluster ..> <Sender ... ackTimeout="60000"/> </Cluster>
+- ReplicationListener and SocketReplicationListener only accept data from
cluster member (low level ip restriction)
+- Change Message protocol (risk)
+ Currently 6 byte header, data.length 4 bytes , data, 6 byte end header
+ Optimized to 2 type header, data.length 4 bytes, data
+ change at DataSender.writeData and XByteBuffer
+==============================
COMPLETED
+==============================
+5.5.10
+- reduce memory and cpu consume (send message)
+ - set new compress sender flag at default=false ( < CPU usage)
+ - don't copy the buffer to add message header
+ transfer this from SimpleTcpCluster to DataSender pushMessage
+ successfull refactored
+ - make it possible that a subclass crypt the transfered messages
+ sub class ReplicationTransmitter and override createMessageData
+ - don't copy START and END Header for every message, instead send
dirctly and DataSender.writeData.
- Add a flag for replicated attribute events, to enable or disable them
Now can configued with notifyListenersOnReplication=false at
SimpleTCPCluster
Also can drop HttpSessionLsitener events
@@ -87,6 +112,7 @@
- Add new SocketReplicationListener
- Add Stats to DeltaManager
- Add single sign on support
+5.5.9
- Add Keep Alive and WaitForAck at async mode implementation.
Make this feature configurable to Sender element at server.xml
Is include with 5.5.8
1.1
jakarta-tomcat-catalina/modules/cluster/test/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitterTest.java
Index: ReplicationTransmitterTest.java
===================================================================
/*
* Copyright 1999,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
under
* the License.
*/
package org.apache.catalina.cluster.tcp;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import junit.framework.TestCase;
import org.apache.catalina.cluster.session.ReplicationStream;
import org.apache.catalina.cluster.session.SessionMessageImpl;
/**
* @author Peter Rossbach
*
* @version $Revision: 1.1 $ $Date: 2005/04/12 18:56:07 $
*/
public class ReplicationTransmitterTest extends TestCase {
public void testCreateMessageData() throws Exception {
ReplicationTransmitter transmitter = new ReplicationTransmitter();
transmitter.setCompress(true);
SessionMessageImpl message= new SessionMessageImpl();
message.setUniqueId("test");
byte [] data = transmitter.createMessageData(message);
assertTrue(200 < data.length);
Object myobj = getGZPObject(data);
assertTrue(myobj instanceof SessionMessageImpl);
assertEquals("test", ((SessionMessageImpl)myobj).getUniqueId());
}
/**
* @param data
* @return
* @throws IOException
* @throws ClassNotFoundException
*/
private Object getGZPObject(byte[] data) throws IOException,
ClassNotFoundException {
ByteArrayInputStream bin =
new ByteArrayInputStream(data);
GZIPInputStream gin =
new GZIPInputStream(bin);
byte[] tmp = new byte[1024];
int length = gin.read(tmp);
byte[] result = new byte[0];
while (length > 0) {
byte[] tmpdata = result;
result = new byte[result.length + length];
System.arraycopy(tmpdata, 0, result, 0, tmpdata.length);
System.arraycopy(tmp, 0, result, tmpdata.length, length);
length = gin.read(tmp);
}
gin.close();
ReplicationStream stream = new ReplicationStream(
new java.io.ByteArrayInputStream(result), getClass()
.getClassLoader());
Object myobj = stream.readObject();
return myobj;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]