User: dmaplesden Date: 01/11/13 17:53:40 Modified: src/main/org/jboss/mq SpyBytesMessage.java SpyEncapsulatedMessage.java SpyMapMessage.java SpyMessage.java SpyObjectMessage.java SpyQueueSender.java SpySession.java SpyStreamMessage.java SpyTextMessage.java SpyTopicPublisher.java SpyXAResourceManager.java Added: src/main/org/jboss/mq MessagePool.java Log: Added message object pool and changed file PM message log to use generic SpyMessage.writeMessage and readMessage methods. Revision Changes Path 1.5 +2 -2 jbossmq/src/main/org/jboss/mq/SpyBytesMessage.java Index: SpyBytesMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyBytesMessage.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- SpyBytesMessage.java 2001/10/28 04:07:34 1.4 +++ SpyBytesMessage.java 2001/11/14 01:53:39 1.5 @@ -18,7 +18,7 @@ * * @author Norbert Lataille ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.4 $ + * @version $Revision: 1.5 $ */ public class SpyBytesMessage extends SpyMessage @@ -402,7 +402,7 @@ public SpyMessage myClone() throws JMSException { - SpyBytesMessage result = new SpyBytesMessage(); + SpyBytesMessage result = MessagePool.getBytesMessage(); this.reset(); result.copyProps( this ); if ( this.InternalArray != null ) { 1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyEncapsulatedMessage.java Index: SpyEncapsulatedMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyEncapsulatedMessage.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SpyEncapsulatedMessage.java 2001/10/28 04:07:34 1.3 +++ SpyEncapsulatedMessage.java 2001/11/14 01:53:39 1.4 @@ -15,7 +15,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class SpyEncapsulatedMessage extends SpyObjectMessage { @@ -54,7 +54,7 @@ public SpyMessage myClone() throws javax.jms.JMSException { - SpyEncapsulatedMessage result = new SpyEncapsulatedMessage(); + SpyEncapsulatedMessage result = MessagePool.getEncapsulatedMessage(); result.copyProps( this ); //HACK to get around read only problem boolean readOnly = result.header.msgReadOnly; 1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyMapMessage.java Index: SpyMapMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMapMessage.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SpyMapMessage.java 2001/10/28 04:07:34 1.3 +++ SpyMapMessage.java 2001/11/14 01:53:39 1.4 @@ -20,7 +20,7 @@ * * @author Norbert Lataille ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class SpyMapMessage extends SpyMessage @@ -373,7 +373,7 @@ public SpyMessage myClone() throws JMSException { - SpyMapMessage result = new SpyMapMessage(); + SpyMapMessage result = MessagePool.getMapMessage(); result.copyProps( this ); result.content = ( Hashtable )this.content.clone(); return result; 1.10 +44 -11 jbossmq/src/main/org/jboss/mq/SpyMessage.java Index: SpyMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyMessage.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- SpyMessage.java 2001/11/10 21:38:04 1.9 +++ SpyMessage.java 2001/11/14 01:53:40 1.10 @@ -25,7 +25,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * - * @version $Revision: 1.9 $ + * @version $Revision: 1.10 $ */ public class SpyMessage implements Serializable, Message, Comparable, Externalizable { @@ -68,7 +68,7 @@ //For ordering in the JMSServerQueue (set on the server side) public transient long messageId; } - + public Header header = new Header(); public transient AcknowledgementRequest ack; @@ -445,7 +445,7 @@ } public SpyMessage myClone() throws JMSException { - SpyMessage result = new SpyMessage(); + SpyMessage result = MessagePool.getMessage(); result.copyProps(this); return result; } @@ -541,7 +541,7 @@ protected static final byte ENCAP_MESS = 6; protected static final byte SPY_MESS = 7; protected static final int BYTE = 0; - + protected static final int SHORT = 1; protected static final int INT = 2; protected static final int LONG = 3; @@ -576,25 +576,25 @@ byte type = in.readByte(); switch (type) { case OBJECT_MESS : - message = new SpyObjectMessage(); + message = MessagePool.getObjectMessage(); break; case BYTES_MESS : - message = new SpyBytesMessage(); + message = MessagePool.getBytesMessage(); break; case MAP_MESS : - message = new SpyMapMessage(); + message = MessagePool.getMapMessage(); break; case STREAM_MESS : - message = new SpyStreamMessage(); + message = MessagePool.getStreamMessage(); break; case TEXT_MESS : - message = new SpyTextMessage(); + message = MessagePool.getTextMessage(); break; case ENCAP_MESS : - message = new SpyEncapsulatedMessage(); + message = MessagePool.getEncapsulatedMessage(); break; default : - message = new SpyMessage(); + message = MessagePool.getMessage(); } try { message.readExternal(in); @@ -742,6 +742,39 @@ } header.jmsProperties.put(key, value); } + } + + //clear for next use in pool + void reset() throws JMSException{ + clearBody(); + this.ack = null; + this.session = null; + //Set by send() method + this.header.jmsDestination = null; + this.header.jmsDeliveryMode = -1; + this.header.jmsExpiration = 0; + this.header.jmsPriority = -1; + this.header.jmsMessageID = null; + this.header.jmsTimeStamp = 0; + //Set by the client + this.header.jmsCorrelationID = true; + this.header.jmsCorrelationIDString = null; + this.header.jmsCorrelationIDbyte = null; + this.header.jmsReplyTo = null; + this.header.jmsType = null; + //Set by the provider + this.header.jmsRedelivered = false; + //Properties + this.header.jmsProperties.clear(); + this.header.jmsPropertiesReadWrite=true; + //Message body + this.header.msgReadOnly = false; + //For noLocal to be able to tell if this was a locally produced message + this.header.producerClientId = null; + //For durable subscriptions + this.header.durableSubscriberID = null; + //For ordering in the JMSServerQueue (set on the server side) + this.header.messageId = 0; } } 1.7 +5 -5 jbossmq/src/main/org/jboss/mq/SpyObjectMessage.java Index: SpyObjectMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyObjectMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SpyObjectMessage.java 2001/10/28 04:07:34 1.6 +++ SpyObjectMessage.java 2001/11/14 01:53:40 1.7 @@ -17,7 +17,7 @@ * * @author Norbert Lataille ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public class SpyObjectMessage extends SpyMessage @@ -67,9 +67,9 @@ } else { /** - * Default implementation ObjectInputStream does not work well - * when running an a micro kernal style app-server like JBoss. - * We need to look for the Class in the context class loader + * Default implementation ObjectInputStream does not work well + * when running an a micro kernal style app-server like JBoss. + * We need to look for the Class in the context class loader * and not in the System classloader. * * Would this be done better by using a MarshaedObject?? @@ -104,7 +104,7 @@ public SpyMessage myClone() throws JMSException { - SpyObjectMessage result = new SpyObjectMessage(); + SpyObjectMessage result = MessagePool.getObjectMessage(); result.copyProps( this ); result.isByteArray = this.isByteArray; if ( objectBytes != null ) { 1.3 +2 -2 jbossmq/src/main/org/jboss/mq/SpyQueueSender.java Index: SpyQueueSender.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyQueueSender.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SpyQueueSender.java 2001/08/17 03:04:01 1.2 +++ SpyQueueSender.java 2001/11/14 01:53:40 1.3 @@ -20,7 +20,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class SpyQueueSender extends SpyMessageProducer @@ -94,7 +94,7 @@ // Encapsulate the message if not a SpyMessage if ( !( message instanceof SpyMessage ) ) { - SpyEncapsulatedMessage m = new SpyEncapsulatedMessage(); + SpyEncapsulatedMessage m = MessagePool.getEncapsulatedMessage(); m.setMessage( message ); message = m; } 1.7 +15 -15 jbossmq/src/main/org/jboss/mq/SpySession.java Index: SpySession.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpySession.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SpySession.java 2001/10/28 04:07:34 1.6 +++ SpySession.java 2001/11/14 01:53:40 1.7 @@ -33,7 +33,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public abstract class SpySession implements Session, XASession { @@ -130,7 +130,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyBytesMessage message = new SpyBytesMessage(); + SpyBytesMessage message = MessagePool.getBytesMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -141,7 +141,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyMapMessage message = new SpyMapMessage(); + SpyMapMessage message = MessagePool.getMapMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -152,7 +152,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyMessage message = new SpyMessage(); + SpyMessage message = MessagePool.getMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -163,7 +163,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyObjectMessage message = new SpyObjectMessage(); + SpyObjectMessage message = MessagePool.getObjectMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -174,7 +174,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyObjectMessage message = new SpyObjectMessage(); + SpyObjectMessage message = MessagePool.getObjectMessage(); message.setObject( object ); message.header.producerClientId = connection.getClientID(); return message; @@ -186,7 +186,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyStreamMessage message = new SpyStreamMessage(); + SpyStreamMessage message = MessagePool.getStreamMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -197,7 +197,7 @@ throw new IllegalStateException( "The session is closed" ); } - SpyTextMessage message = new SpyTextMessage(); + SpyTextMessage message = MessagePool.getTextMessage(); message.header.producerClientId = connection.getClientID(); return message; } @@ -236,25 +236,25 @@ Iterator i; synchronized ( consumers ) { - + //notify the sleeping synchronous listeners if ( sessionConsumer != null ) { sessionConsumer.close(); } - + i = consumers.iterator(); } - + while ( i.hasNext() ) { SpyMessageConsumer messageConsumer = ( SpyMessageConsumer )i.next(); messageConsumer.close(); } - + //deal with any unacked messages if ( transacted && spyXAResource == null ) { rollback(); } - + connection.sessionClosing( this ); closed = true; @@ -312,9 +312,9 @@ if ( !transacted ) { throw new IllegalStateException( "The session is not transacted" ); } - + cat.debug( "Session: rollback()" ); - + // rollback transaction try { connection.spyXAResourceManager.endTx( currentTransactionId, true ); 1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyStreamMessage.java Index: SpyStreamMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyStreamMessage.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SpyStreamMessage.java 2001/10/28 04:07:34 1.3 +++ SpyStreamMessage.java 2001/11/14 01:53:40 1.4 @@ -20,7 +20,7 @@ * * @author Norbert Lataille ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class SpyStreamMessage extends SpyMessage @@ -490,7 +490,7 @@ public SpyMessage myClone() throws JMSException { - SpyStreamMessage result = new SpyStreamMessage(); + SpyStreamMessage result = MessagePool.getStreamMessage(); result.copyProps( this ); result.content = ( Vector )this.content.clone(); result.position = this.position; 1.7 +4 -4 jbossmq/src/main/org/jboss/mq/SpyTextMessage.java Index: SpyTextMessage.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyTextMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SpyTextMessage.java 2001/11/04 06:53:31 1.6 +++ SpyTextMessage.java 2001/11/14 01:53:40 1.7 @@ -18,7 +18,7 @@ * * @author Norbert Lataille ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public class SpyTextMessage extends SpyMessage @@ -31,7 +31,7 @@ private final static long serialVersionUID = 235726945332013953L; private final static int chunkSize = 16384; - + // Public -------------------------------------------------------- public void setText( String string ) @@ -60,7 +60,7 @@ public SpyMessage myClone() throws JMSException { - SpyTextMessage result = new SpyTextMessage(); + SpyTextMessage result = MessagePool.getTextMessage(); result.copyProps( this ); result.content = this.content; return result; @@ -76,7 +76,7 @@ content = null; } else - { + { // apply workaround for string > 64K bug in jdk's 1.3.* // Read the no. of chunks this message is split into, allocate 1.4 +2 -2 jbossmq/src/main/org/jboss/mq/SpyTopicPublisher.java Index: SpyTopicPublisher.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyTopicPublisher.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SpyTopicPublisher.java 2001/10/04 23:25:11 1.3 +++ SpyTopicPublisher.java 2001/11/14 01:53:40 1.4 @@ -20,7 +20,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class SpyTopicPublisher extends SpyMessageProducer @@ -94,7 +94,7 @@ // Encapsulate the message if not a SpyMessage if ( !( message instanceof SpyMessage ) ) { - SpyEncapsulatedMessage m = new SpyEncapsulatedMessage(); + SpyEncapsulatedMessage m = MessagePool.getEncapsulatedMessage(); m.setMessage( message ); message = m; } 1.4 +11 -1 jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java Index: SpyXAResourceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/SpyXAResourceManager.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SpyXAResourceManager.java 2001/08/28 21:38:24 1.3 +++ SpyXAResourceManager.java 2001/11/14 01:53:40 1.4 @@ -20,7 +20,7 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class SpyXAResourceManager implements java.io.Serializable { @@ -96,6 +96,11 @@ transaction.acks = job; } connection.send( transaction ); + //release messages back to pool now they are sent + if(transaction.messages != null){ + for(int i=0;i<transaction.messages.length;i++) + MessagePool.releaseMessage(transaction.messages[i]); + } } else { if ( state.txState != TX_PREPARED ) { throw new XAException( "The transaction had not been prepared" ); @@ -104,6 +109,11 @@ transaction.xid = xid; transaction.requestType = transaction.TWO_PHASE_COMMIT_COMMIT_REQUEST; connection.send( transaction ); + //release messages back to pool now they are sent + if(transaction.messages != null){ + for(int i=0;i<transaction.messages.length;i++) + MessagePool.releaseMessage(transaction.messages[i]); + } } state.txState = TX_COMMITED; } 1.1 jbossmq/src/main/org/jboss/mq/MessagePool.java Index: MessagePool.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq; /** * This class provides a pool of SpyMessages. * * This is an very simple implementation first up. * * @author David Maplesden ([EMAIL PROTECTED]) */ public class MessagePool { //static flag which turns off pooling altogether if false. public static final boolean POOL = true; //no point growing pools too much, the max size any one pool will get public static final int MAX_POOL_SIZE = 10*1000; protected static java.util.ArrayList messagePool = new java.util.ArrayList(); protected static java.util.ArrayList bytesPool = new java.util.ArrayList(); protected static java.util.ArrayList mapPool = new java.util.ArrayList(); protected static java.util.ArrayList streamPool = new java.util.ArrayList(); protected static java.util.ArrayList objectPool = new java.util.ArrayList(); protected static java.util.ArrayList textPool = new java.util.ArrayList(); protected static java.util.ArrayList encapPool = new java.util.ArrayList(); /** * Gets a message from the pools. */ public static SpyMessage getMessage(){ if(POOL){ synchronized(messagePool){ if(!messagePool.isEmpty()) return (SpyMessage) messagePool.remove(messagePool.size()-1); } } return new SpyMessage(); } /** * Gets a bytes message from the pools. */ public static SpyBytesMessage getBytesMessage(){ if(POOL){ synchronized(bytesPool){ if(!bytesPool.isEmpty()) return (SpyBytesMessage) bytesPool.remove(bytesPool.size()-1); } } return new SpyBytesMessage(); } /** * Gets a map message from the pools. */ public static SpyMapMessage getMapMessage(){ if(POOL){ synchronized(mapPool){ if(!mapPool.isEmpty()) return (SpyMapMessage) mapPool.remove(mapPool.size()-1); } } return new SpyMapMessage(); } /** * Gets a stream message from the pools. */ public static SpyStreamMessage getStreamMessage(){ if(POOL){ synchronized(streamPool){ if(!streamPool.isEmpty()) return (SpyStreamMessage) streamPool.remove(streamPool.size()-1); } } return new SpyStreamMessage(); } /** * Gets a object message from the pools. */ public static SpyObjectMessage getObjectMessage(){ if(POOL){ synchronized(objectPool){ if(!objectPool.isEmpty()) return (SpyObjectMessage) objectPool.remove(objectPool.size()-1); } } return new SpyObjectMessage(); } /** * Gets a text message from the pools. */ public static SpyTextMessage getTextMessage(){ if(POOL){ synchronized(textPool){ if(!textPool.isEmpty()) return (SpyTextMessage) textPool.remove(textPool.size()-1); } } return new SpyTextMessage(); } /** * Gets a encapsulated message from the pools. */ public static SpyEncapsulatedMessage getEncapsulatedMessage(){ if(POOL){ synchronized(encapPool){ if(!encapPool.isEmpty()) return (SpyEncapsulatedMessage) encapPool.remove(encapPool.size()-1); } } return new SpyEncapsulatedMessage(); } /** * Releases a SpyMessage back to the pools for reuse. */ public static void releaseMessage(SpyMessage message){ if(!POOL){ return; }else{ if(message == null) return; try{ message.reset(); }catch(javax.jms.JMSException e){ //unable to re-use message return; } if(message instanceof SpyTextMessage){ synchronized(textPool){ if(textPool.size() < MAX_POOL_SIZE) textPool.add(message); } }else if(message instanceof SpyEncapsulatedMessage){ //must test for encap mess before object mess because encap mess extends object mess synchronized(encapPool){ if(encapPool.size() < MAX_POOL_SIZE) encapPool.add(message); } }else if(message instanceof SpyObjectMessage){ synchronized(objectPool){ if(objectPool.size() < MAX_POOL_SIZE) objectPool.add(message); } }else if(message instanceof SpyBytesMessage){ synchronized(bytesPool){ if(bytesPool.size() < MAX_POOL_SIZE) bytesPool.add(message); } }else if(message instanceof SpyMapMessage){ synchronized(mapPool){ if(mapPool.size() < MAX_POOL_SIZE) mapPool.add(message); } }else if(message instanceof SpyStreamMessage){ synchronized(streamPool){ if(streamPool.size() < MAX_POOL_SIZE) streamPool.add(message); } }else{ //plain old SpyMessage synchronized(messagePool){ if(messagePool.size() < MAX_POOL_SIZE) messagePool.add(message); } } } } }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development