User: dmaplesden Date: 01/11/13 17:53:40 Modified: src/main/org/jboss/mq/server BasicQueue.java MessageCache.java MessageReference.java Log: Added message object pool and changed file PM message log to use generic SpyMessage.writeMessage and readMessage methods. Revision Changes Path 1.9 +5 -3 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java Index: BasicQueue.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- BasicQueue.java 2001/10/28 04:07:35 1.8 +++ BasicQueue.java 2001/11/14 01:53:40 1.9 @@ -31,7 +31,7 @@ * @author Norbert Lataille ([EMAIL PROTECTED]) * @author David Maplesden ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.8 $ + * @version $Revision: 1.9 $ */ //abstract public class BasicQueue implements Runnable { public class BasicQueue { @@ -211,7 +211,7 @@ restoreMessage(message); } } - + class RemoveMessageTask implements Runnable { MessageReference message; RemoveMessageTask(MessageReference m) { @@ -220,6 +220,8 @@ public void run() { try { server.getMessageCache().remove(message); + //we are finally done with message so we can release it back to pool + org.jboss.mq.MessagePool.releaseMessage(message.hardReference); } catch ( JMSException e ) { cat.error("Could not remove an acknowleged message from the message cache: ", e); } @@ -257,7 +259,7 @@ Runnable task = new RestoreMessageTask(m, item.subscriberId); server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task); - + task = new RemoveMessageTask(m); server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task); } 1.6 +76 -52 jbossmq/src/main/org/jboss/mq/server/MessageCache.java Index: MessageCache.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCache.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- MessageCache.java 2001/11/12 06:36:49 1.5 +++ MessageCache.java 2001/11/14 01:53:40 1.6 @@ -23,38 +23,61 @@ * later. * * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> - * @version $Revision: 1.5 $ + * @version $Revision: 1.6 $ */ public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration, Runnable { + //pool of message ref objects + public static final int MAX_POOL_SIZE = 100*1000; + protected ArrayList pool = new ArrayList(MAX_POOL_SIZE); + // The cached messages are orded in a LRU linked list private LinkedList lruCache = new LinkedList(); - + // Provides a Unique ID to MessageHanles private long messageCounter = 0; int cacheHits = 0; int cacheMisses = 0; - + private File dataFile; private String dataDirectory; private Thread referenceSoftner; - + private long highMemoryMark = 1024L * 1000 * 16; private long maxMemoryMark = 1024L * 1000 * 32; public static final long ONE_MEGABYTE = 1024L * 1000; - + int softRefCacheSize = 0; int totalCacheSize = 0; - + // Used to get notified when message are being deleted by GC ReferenceQueue referenceQueue = new ReferenceQueue(); - + public MessageCache getInstance() { return this; } + //this method is only called from within synchronized block, so don't need to sync + protected MessageReference getMessageReference(MessageCache messageCache, Long referenceId, SpyMessage message) { + MessageReference ref = null; + if(!pool.isEmpty()) + ref = (MessageReference)pool.remove(pool.size()-1); + else + ref = new MessageReference(); + ref.init(messageCache,referenceId,message); + return ref; + } + + //this method is only called from within synchronized block, so don't need to sync + protected void releaseMessageReference(MessageReference ref){ + if(pool.size() < MAX_POOL_SIZE){ + ref.reset(); + pool.add(ref); + } + } + /** * Adds a message to the cache */ @@ -63,16 +86,16 @@ log.trace("add lock aquire"); synchronized (this) { - MessageReference mh = new MessageReference(this, new Long(messageCounter++), message); + MessageReference mh = getMessageReference(this, new Long(messageCounter++), message); lruCache.addLast(mh); totalCacheSize++; validateSoftReferenceDepth(); - + log.trace("add lock release"); return mh; } } - + /** * removes a message from the cache */ @@ -83,11 +106,12 @@ { mr.clear(); lruCache.remove(mr); + releaseMessageReference(mr); totalCacheSize--; log.trace("remove lock release"); } } - + /** * The strategy is that we keep the most recently used messages as * Hard references. Then we make the older ones soft references. Making @@ -108,7 +132,7 @@ { log.trace("Waiting for a reference to get GCed."); // Get the next soft reference that was canned by the GC - + Reference r = referenceQueue.remove(1000); if (r != null) { @@ -132,7 +156,7 @@ } log.debug("Thread exiting."); } - + /** * This method is in charge of determining if it time to convert some * hard references over to soft references. @@ -142,10 +166,10 @@ log.trace("run lock aquire"); synchronized (this) { - + // howmany to change over to soft refs int chnageCount = 0; - + long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); if (currentMem > highMemoryMark) { @@ -158,7 +182,7 @@ int howManyShouldBeSoft = (int) ((totoalMessageInMem) * severity); chnageCount = howManyShouldBeSoft - getSoftRefCacheSize(); } - + // Ignore change counts of 1 since this will happen too often even // if the serverity is low since it will round up. if (chnageCount > 1) @@ -174,9 +198,9 @@ } log.trace("run lock release"); } - + } - + /** * This gets called when a MessageReference is de-referenced. * We will pop it to the top of the RLU @@ -192,7 +216,7 @@ log.trace("messageReferenceUsedEvent lock released"); } } - + ////////////////////////////////////////////////////////////////////////////////// // Perisitence methods used by the MessageReference. // TODO: delegate this work to a PM. @@ -205,7 +229,7 @@ is.close(); return (SpyMessage) rc; } - + void saveToStorage(MessageReference mh, SpyMessage message) throws IOException { File f = new File(dataFile, "Message-" + mh.referenceId); @@ -213,20 +237,20 @@ os.writeObject(message); os.close(); } - + void removeFromStorage(MessageReference mh) throws IOException { File f = new File(dataFile, "Message-" + mh.referenceId); f.delete(); } - + ////////////////////////////////////////////////////////////////////////////////// // // The following section deals the the JMX interface to manage the Cache // ////////////////////////////////////////////////////////////////////////////////// - - + + /** * This gets called to start the cache service. Synch. by start */ @@ -235,11 +259,11 @@ File jbossHome = new File(System.getProperty("jboss.system.home")); dataFile = new File(jbossHome, dataDirectory); log.debug("Data directory set to: " + dataFile.getCanonicalPath()); - + dataFile.mkdirs(); if (!dataFile.isDirectory()) throw new Exception("The configured data directory is not valid: " + dataDirectory); - + // Clean out the directory of any previous files. File files[] = dataFile.listFiles(); log.debug("Removing " + files.length + " file(s) from: " + dataFile.getCanonicalPath()); @@ -247,16 +271,16 @@ { files[i].delete(); } - + if (getState() == ServiceMBeanSupport.STARTED) throw new Exception("Cannot be initialized from the current state"); - + referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner"); referenceSoftner.setDaemon(true); referenceSoftner.start(); - + } - + /** * This gets called to stop the cache service. */ @@ -264,11 +288,11 @@ { if (!(getState() == ServiceMBeanSupport.STARTED)) return; - + referenceSoftner.interrupt(); referenceSoftner = null; } - + /** * Gets the dataDirectory * @return Returns a String @@ -285,7 +309,7 @@ { this.dataDirectory = dataDirectory; } - + /** * Gets the hardRefCacheSize * @return Returns a int @@ -294,7 +318,7 @@ { return lruCache.size(); } - + /** * Gets the softRefCacheSize * @return Returns a int @@ -303,7 +327,7 @@ { return softRefCacheSize; } - + /** * Gets the totalCacheSize * @return Returns a int @@ -312,7 +336,7 @@ { return totalCacheSize; } - + /** * Gets the cacheMisses * @return Returns a int @@ -321,7 +345,7 @@ { return cacheMisses; } - + /** * Gets the cacheHits * @return Returns a int @@ -330,7 +354,7 @@ { return cacheHits; } - + /** * Gets the highMemoryMark * @return Returns a long @@ -347,7 +371,7 @@ { this.highMemoryMark = highMemoryMark * ONE_MEGABYTE; } - + /** * Gets the maxMemoryMark * @return Returns a long @@ -356,7 +380,7 @@ { return maxMemoryMark / ONE_MEGABYTE; } - + /** * Gets the CurrentMemoryUsage * @return Returns a long @@ -365,7 +389,7 @@ { return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE; } - + /** * Sets the maxMemoryMark * @param maxMemoryMark The maxMemoryMark to set @@ -374,7 +398,7 @@ { this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE; } - + /** * @see ServiceMBeanSupport#getName() */ @@ -382,7 +406,7 @@ { return "MessageCache"; } - + /** * This test creates 5000 x 100K messages and places them * in the MessageCache. With out a cache this would be @@ -391,7 +415,7 @@ */ public void testBigLoad() throws Exception { - + MessageCache cache = new MessageCache(); File tempDir = new File("Temp-" + System.currentTimeMillis()); tempDir.mkdirs(); @@ -399,9 +423,9 @@ cache.setHighMemoryMark(40); cache.setMaxMemoryMark(60); cache.start(); - + LinkedList ll = new LinkedList(); - + int TEST_SIZE = 5000; // Create a whole bunch of messages. java.util.Random rand = new java.util.Random(System.currentTimeMillis()); @@ -413,18 +437,18 @@ bm.writeBytes(new byte[1024 * 100]); // 100K messages MessageReference mr = cache.add(bm); ll.add(mr); - + // Randomly pickout messages out of the cache.. int pick = rand.nextInt(i + 1); mr = (MessageReference) ll.get(pick); mr.getMessage(); } - + log.info("Used Mem=" + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())); //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == TEST_SIZE); log.info("Messages with Hard Refs=" + cache.getHardRefCacheSize()); log.info("Messages with Soft Refs=" + cache.getSoftRefCacheSize()); - + log.info("Removing the messages"); Iterator iter = ll.iterator(); for (int i = 0; i < TEST_SIZE; i++) @@ -433,15 +457,15 @@ iter.remove(); cache.remove(mr); } - + log.info("Stopping"); //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 0); cache.stop(); //assertTrue("Data directory clean up check", tempDir.listFiles().length == 0); tempDir.delete(); - + log.info("Cache Hits=" + cache.getCacheHits()); log.info("Cache Misses=" + cache.getCacheMisses()); } - + } 1.4 +29 -9 jbossmq/src/main/org/jboss/mq/server/MessageReference.java Index: MessageReference.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageReference.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- MessageReference.java 2001/11/06 06:14:31 1.3 +++ MessageReference.java 2001/11/14 01:53:40 1.4 @@ -10,16 +10,16 @@ import org.jboss.mq.SpyJMSException; import javax.jms.JMSException; /** - * This class holds a reference to an actual Message. Where it is actually + * This class holds a reference to an actual Message. Where it is actually * at may vary. The reference it holds may be a: * <ul> * <li>Hard Reference - The message is consider recently used and should not be paged out * <li>Soft Reference - The message is consider old and CAN be removed from memory by the GC * <li>No Reference - The message was removed from memory by the GC, but we can load it from a file. * </ul> - * + * * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class MessageReference implements Comparable { org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance(MessageReference.class); @@ -36,23 +36,37 @@ MessageCache messageCache; SpyMessage hardReference; MessageSoftReference softReference; - boolean isStored = false; + boolean isStored; // These fields are copied over from the messae itself.. // they are used too often to not have them handy. byte jmsPriority; long messageId; // This object could be used by the PM to associate some info - public transient Object persistData; + public transient Object persistData = null; - MessageReference(MessageCache messageCache, Long referenceId, SpyMessage message) { + MessageReference(){ + } + + //init and reset methods for use by object pool + void init(MessageCache messageCache, Long referenceId, SpyMessage message) { this.messageCache = messageCache; this.hardReference = message; this.referenceId = referenceId; this.jmsPriority = (byte) message.getJMSPriority(); this.messageId = message.header.messageId; + this.isStored = false; } + void reset(){ + //clear refs so gc can collect unused objects + this.messageCache = null; + this.hardReference = null; + + this.softReference = null; + this.persistData = null; + } + public SpyMessage getMessage() throws JMSException { cat.debug("getMessage lock aquire"); synchronized (this) { @@ -71,9 +85,9 @@ /** * We could optimize caching by keeping the headers but not the body. - * The server will uses the headers more often than the body and the + * The server will uses the headers more often than the body and the * headers take up much message memory than the body - * + * * For now just return the message. */ public SpyMessage.Header getHeaders() throws javax.jms.JMSException { @@ -113,8 +127,14 @@ softReference = new MessageSoftReference(this, hardReference, messageCache.referenceQueue); + //I am undecided about whether to release this message back to the message pool or not + //the whole point of this caching business is to release the memory this message is using up + //if we are going to do it then this is the place... + //MessagePool.releaseMessage(hardReference); + // We don't need the hard ref anymore.. hardReference = null; + messageCache.softRefCacheSize++; } catch (java.io.IOException e) { throw new SpyJMSException("Message Cache IO error: ", e); @@ -167,7 +187,7 @@ /** * This method allows message to be order on the server queues * by priority and the order that they came in on. - * + * * @see Comparable#compareTo(Object) */ public int compareTo(Object o) {
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development