gdamour 2004/06/24 16:52:12
Modified: sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin JoinRequest.java sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode LogicalCompression.java sandbox/messaging/src/java/org/apache/geronimo/messaging RequestSender.java Log: o RequestSender flags Reqest Msgs with a byte instead of an int; o Improve a little bit LogicalCompression to compress systematically headers which do not require the shared knowledge of a NodeTopology. Revision Changes Path 1.3 +5 -3 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin/JoinRequest.java Index: JoinRequest.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin/JoinRequest.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- JoinRequest.java 3 Jun 2004 14:51:16 -0000 1.2 +++ JoinRequest.java 24 Jun 2004 23:52:12 -0000 1.3 @@ -21,6 +21,7 @@ import org.apache.geronimo.messaging.CommunicationException; import org.apache.geronimo.messaging.Msg; +import org.apache.geronimo.messaging.MsgBody; import org.apache.geronimo.messaging.MsgHeader; import org.apache.geronimo.messaging.MsgHeaderConstants; import org.apache.geronimo.messaging.NodeInfo; @@ -74,8 +75,9 @@ header.addHeader(MsgHeaderConstants.DEST_NODES, joined); header.addHeader(MsgHeaderConstants.SRC_ENDPOINT, ""); header.addHeader(MsgHeaderConstants.CORRELATION_ID, - new RequestSender.RequestID(new Integer(0))); - + new RequestSender.RequestID((byte) 0)); + header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST); + msg.getBody().setContent(joiner); final FutureResult result = new FutureResult(); 1.2 +40 -43 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java Index: LogicalCompression.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- LogicalCompression.java 11 May 2004 12:06:42 -0000 1.1 +++ LogicalCompression.java 24 Jun 2004 23:52:12 -0000 1.2 @@ -18,6 +18,8 @@ package org.apache.geronimo.messaging.remotenode; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.geronimo.messaging.Msg; import org.apache.geronimo.messaging.MsgBody; @@ -51,22 +53,22 @@ /** * No logical compression. */ - private final static byte NULL = 0x00; + private final static byte NULL = (byte) 0; /** * Compression based on the Topology shared knowledge. */ - private final static byte TOPOLOGY = 0x01; + private final static byte TOPOLOGY = (byte) 1; /** * Identifies a request. */ - private final static byte REQUEST = 0x01; + private final static byte REQUEST = (byte) 0; /** * Identifies a response. */ - private final static byte RESPONSE = 0x02; + private final static byte RESPONSE = (byte) 1; public NodeTopology getTopology() { return topology; @@ -78,60 +80,68 @@ public Object beforePop(StreamInputStream anIn) throws IOException { + List result = new ArrayList(5); + int bodyType = anIn.readByte(); + if ( REQUEST == bodyType ) { + result.add(MsgBody.Type.REQUEST); + } else { + result.add(MsgBody.Type.RESPONSE); + } + byte reqID = anIn.readByte(); + result.add(new RequestSender.RequestID(reqID)); byte type = anIn.readByte(); if ( type == NULL ) { - return null; + return result; } if ( null == topology ) { throw new IllegalArgumentException("No topology is defined."); } - Object[] result = new Object[5]; int id = anIn.readInt(); NodeInfo nodeInfo = topology.getNodeById(id); - result[0] = nodeInfo; + result.add(nodeInfo); id = anIn.readInt(); nodeInfo = topology.getNodeById(id); - result[1] = nodeInfo; + result.add(nodeInfo); id = anIn.readInt(); nodeInfo = topology.getNodeById(id); - result[2] = nodeInfo; - - int bodyType = anIn.read(); - if ( REQUEST == bodyType ) { - result[3] = MsgBody.Type.REQUEST; - } else { - result[3] = MsgBody.Type.RESPONSE; - } - - int reqID = anIn.readInt(); - result[4] = new RequestSender.RequestID(new Integer(reqID)); - return result; + result.add(nodeInfo); + return result.toArray(); } public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque) throws IOException { - if ( null == anOpaque ) { + List prePop = (List) anOpaque; + MsgHeader header = aMsg.getHeader(); + header.addHeader(MsgHeaderConstants.BODY_TYPE, prePop.get(0)); + header.addHeader(MsgHeaderConstants.CORRELATION_ID, prePop.get(1)); + if ( 5 != prePop.size() ) { return; } - Object[] prePop = (Object[]) anOpaque; - MsgHeader header = aMsg.getHeader(); - header.addHeader(MsgHeaderConstants.SRC_NODE, prePop[0]); - header.addHeader(MsgHeaderConstants.DEST_NODE, prePop[1]); - header.addHeader(MsgHeaderConstants.DEST_NODES, prePop[2]); - header.addHeader(MsgHeaderConstants.BODY_TYPE, prePop[3]); - header.addHeader(MsgHeaderConstants.CORRELATION_ID, prePop[4]); + header.addHeader(MsgHeaderConstants.SRC_NODE, prePop.get(2)); + header.addHeader(MsgHeaderConstants.DEST_NODE, prePop.get(3)); + header.addHeader(MsgHeaderConstants.DEST_NODES, prePop.get(4)); } public Object beforePush(StreamOutputStream anOut, Msg aMsg) throws IOException { + MsgHeader header = aMsg.getHeader(); + MsgBody.Type type = (MsgBody.Type) + header.resetHeader(MsgHeaderConstants.BODY_TYPE); + if ( type == MsgBody.Type.REQUEST ) { + anOut.writeByte(REQUEST); + } else { + anOut.writeByte(RESPONSE); + } + RequestSender.RequestID reqID = (RequestSender.RequestID) + header.resetHeader(MsgHeaderConstants.CORRELATION_ID); + anOut.writeByte(reqID.getID()); if ( null == topology ) { anOut.writeByte(NULL); return null; } anOut.writeByte(TOPOLOGY); - MsgHeader header = aMsg.getHeader(); NodeInfo info = (NodeInfo) header.resetHeader(MsgHeaderConstants.SRC_NODE); @@ -144,19 +154,6 @@ NodeInfo target = (NodeInfo) header.resetHeader(MsgHeaderConstants.DEST_NODES); anOut.writeInt(topology.getIDOfNode(target)); - - MsgBody.Type type = (MsgBody.Type) - header.resetHeader(MsgHeaderConstants.BODY_TYPE); - if ( type == MsgBody.Type.REQUEST ) { - anOut.write(REQUEST); - } else { - anOut.write(RESPONSE); - } - - RequestSender.RequestID reqID = (RequestSender.RequestID) - header.resetHeader(MsgHeaderConstants.CORRELATION_ID); - anOut.writeInt(reqID.getID()); - return null; } 1.4 +66 -35 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/RequestSender.java Index: RequestSender.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/RequestSender.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- RequestSender.java 10 Jun 2004 23:08:07 -0000 1.3 +++ RequestSender.java 24 Jun 2004 23:52:12 -0000 1.4 @@ -22,8 +22,6 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,17 +43,27 @@ /** * Number of milliseconds to wait for a response. */ - private static final long WAIT_RESPONSE = 5000; + private static final long WAIT_RESPONSE = 2000; + + /** + * Maximum number of requests that this instance can performed concurrently. + */ + private static final int MAX_CONCURRENT_REQUEST = 255; + + /** + * Memory barrier for seqID counter. + */ + private final Object seqMemBarrier = new Object(); /** * Used to generate request identifiers. */ - private static volatile int seqID = 0; + private int seqID = 0; /** - * Request id to FuturResult map. + * Request id to FuturResult[]. */ - private final Map responses; + private final Object[] responses; /** @@ -66,7 +74,7 @@ * request. */ public RequestSender() { - responses = new HashMap(); + responses = new Object[MAX_CONCURRENT_REQUEST + 1]; } /** @@ -101,8 +109,8 @@ Msg msg = new Msg(); MsgHeader header = msg.getHeader(); - RequestID id = createID(aTargetNodes); - header.addHeader(MsgHeaderConstants.CORRELATION_ID, id); + IDTOFutureResult futurResult = createID(aTargetNodes); + header.addHeader(MsgHeaderConstants.CORRELATION_ID, futurResult.id); header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes); header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST); header.addHeader(MsgHeaderConstants.DEST_ENDPOINT, aTargetID); @@ -112,7 +120,7 @@ anOut.push(msg); - Result result = waitResponse(id, WAIT_RESPONSE); + Result result = waitResponse(futurResult.futurResults, WAIT_RESPONSE); if ( !result.isSuccess() ) { throw new CommunicationException(result.getThrowable()); } @@ -124,16 +132,27 @@ * identifier for this slot. * * @param aTargetNodes Nodes to which the request is to be sent. - * @return Request identifier. + * @return Request identifier and FutureResults. */ - private RequestID createID(NodeInfo[] aTargetNodes) { + private IDTOFutureResult createID(NodeInfo[] aTargetNodes) { FutureResult[] results = new FutureResult[aTargetNodes.length]; for (int i = 0; i < results.length; i++) { results[i] = new FutureResult(); } - RequestID id = new RequestID(new Integer(seqID++)); - responses.put(id, results); - return id; + int idAsInt; + synchronized (seqMemBarrier) { + // Implementation note: it is unlikely to have more than + // MAX_CONCURRENT_REQUEST Threads sending requests concurrently; + // This implementation assumes this unlikelihood. + if ( MAX_CONCURRENT_REQUEST == ++seqID ) seqID = 1; + responses[seqID] = results; + idAsInt = seqID; + } + RequestID id = new RequestID((byte)idAsInt); + IDTOFutureResult result = new IDTOFutureResult(); + result.id = id; + result.futurResults = results; + return result; } /** @@ -143,15 +162,13 @@ * @param aWaitTime number of milliseconds to wait for a response. * @return Result of the request. */ - private Result waitResponse(RequestID anID, long aWaitTime) { - FutureResult[] results = (FutureResult[]) responses.get(anID); + private Result waitResponse(FutureResult[] aResults, long aWaitTime) { Exception ex; try { Result returned = null; - for (int i = 0; i < results.length; i++) { - returned = (Result) results[i].timedGet(aWaitTime); + for (int i = 0; i < aResults.length; i++) { + returned = (Result) aResults[i].timedGet(aWaitTime); } - responses.remove(anID); return returned; } catch (TimeoutException e) { log.error(e); @@ -176,46 +193,60 @@ if ( false == anID instanceof RequestID ) { throw new IllegalArgumentException("ID is of the wrong type."); } - FutureResult[] results = (FutureResult[]) responses.get(anID); - for (int i = 0; i < results.length; i++) { - FutureResult result = results[i]; - if ( null == result.peek() ) { - result.set(aResult); - break; + RequestID id = (RequestID) anID; + int index = id.id <= 0 ? id.id & 127 + 128 : id.id; + FutureResult[] results; + results = (FutureResult[]) responses[index]; + if ( null == results ) { + log.error("Invalid request ID {" + anID + "}"); + return; + } + synchronized (results) { + for (int i = 0; i < results.length; i++) { + FutureResult result = results[i]; + if ( null == result.peek() ) { + result.set(aResult); + break; + } } } } + private static class IDTOFutureResult { + private RequestID id; + private FutureResult[] futurResults; + } + /** * Request identifier. */ public static class RequestID implements Externalizable { - protected Integer id; + protected byte id; /** * Required for Externalization. */ public RequestID() {} - public RequestID(Integer anID) { + public RequestID(byte anID) { id = anID; } - public int getID() { - return id.intValue(); + public byte getID() { + return id; } public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(id.intValue()); + out.write(id); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = new Integer(in.readInt()); + id = (byte) in.read(); } public int hashCode() { - return id.hashCode(); + return id; } public boolean equals(Object obj) { if ( false == obj instanceof RequestID ) { return false; } RequestID otherID = (RequestID) obj; - return id.equals(otherID.id); + return id == otherID.id; } public String toString() { return "ID=" + id;