User: dmaplesden
  Date: 01/11/13 17:53:40

  Modified:    src/main/org/jboss/mq/server BasicQueue.java
                        MessageCache.java MessageReference.java
  Log:
  Added message object pool and changed file PM message log to use generic 
SpyMessage.writeMessage and readMessage methods.
  
  Revision  Changes    Path
  1.9       +5 -3      jbossmq/src/main/org/jboss/mq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- BasicQueue.java   2001/10/28 04:07:35     1.8
  +++ BasicQueue.java   2001/11/14 01:53:40     1.9
  @@ -31,7 +31,7 @@
    * @author     Norbert Lataille ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.8 $
  + * @version    $Revision: 1.9 $
    */
   //abstract public class BasicQueue implements Runnable {
   public class BasicQueue {
  @@ -211,7 +211,7 @@
               restoreMessage(message);
            }
         }
  -      
  +
         class RemoveMessageTask implements Runnable {
            MessageReference message;
            RemoveMessageTask(MessageReference m) {
  @@ -220,6 +220,8 @@
            public void run() {
                try {
                server.getMessageCache().remove(message);
  +               //we are finally done with message so we can release it back to pool
  +               org.jboss.mq.MessagePool.releaseMessage(message.hardReference);
                } catch ( JMSException e ) {
                        cat.error("Could not remove an acknowleged message from the 
message cache: ", e);
                }
  @@ -257,7 +259,7 @@
   
            Runnable task = new RestoreMessageTask(m, item.subscriberId);
            server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, 
task);
  -         
  +
            task = new RemoveMessageTask(m);
            server.getPersistenceManager().getTxManager().addPostCommitTask(txId, 
task);
         }
  
  
  
  1.6       +76 -52    jbossmq/src/main/org/jboss/mq/server/MessageCache.java
  
  Index: MessageCache.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCache.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- MessageCache.java 2001/11/12 06:36:49     1.5
  +++ MessageCache.java 2001/11/14 01:53:40     1.6
  @@ -23,38 +23,61 @@
    * later.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a>
  - * @version    $Revision: 1.5 $
  + * @version    $Revision: 1.6 $
    */
   public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, 
MBeanRegistration, Runnable
   {
  +   //pool of message ref objects
  +   public static final int MAX_POOL_SIZE = 100*1000;
  +   protected ArrayList pool = new ArrayList(MAX_POOL_SIZE);
  +
      // The cached messages are orded in a LRU linked list
      private LinkedList lruCache = new LinkedList();
  -   
  +
      // Provides a Unique ID to MessageHanles
      private long messageCounter = 0;
      int cacheHits = 0;
      int cacheMisses = 0;
  -   
  +
      private File dataFile;
      private String dataDirectory;
      private Thread referenceSoftner;
  -   
  +
      private long highMemoryMark = 1024L * 1000 * 16;
      private long maxMemoryMark = 1024L * 1000 * 32;
      public static final long ONE_MEGABYTE = 1024L * 1000;
  -   
  +
      int softRefCacheSize = 0;
      int totalCacheSize = 0;
  -   
  +
      // Used to get notified when message are being deleted by GC
      ReferenceQueue referenceQueue = new ReferenceQueue();
  -   
  +
   
      public MessageCache getInstance()
      {
         return this;
      }
   
  +   //this method is only called from within synchronized block, so don't need to 
sync
  +   protected MessageReference getMessageReference(MessageCache messageCache, Long 
referenceId, SpyMessage message) {
  +      MessageReference ref = null;
  +      if(!pool.isEmpty())
  +         ref = (MessageReference)pool.remove(pool.size()-1);
  +      else
  +         ref = new MessageReference();
  +      ref.init(messageCache,referenceId,message);
  +      return ref;
  +   }
  +
  +   //this method is only called from within synchronized block, so don't need to 
sync
  +   protected void releaseMessageReference(MessageReference ref){
  +      if(pool.size() < MAX_POOL_SIZE){
  +         ref.reset();
  +         pool.add(ref);
  +      }
  +   }
  +
      /**
       * Adds a message to the cache
       */
  @@ -63,16 +86,16 @@
         log.trace("add lock aquire");
         synchronized (this)
         {
  -         MessageReference mh = new MessageReference(this, new 
Long(messageCounter++), message);
  +         MessageReference mh = getMessageReference(this, new 
Long(messageCounter++), message);
            lruCache.addLast(mh);
            totalCacheSize++;
            validateSoftReferenceDepth();
  -         
  +
            log.trace("add lock release");
            return mh;
         }
      }
  -   
  +
      /**
       * removes a message from the cache
       */
  @@ -83,11 +106,12 @@
         {
            mr.clear();
            lruCache.remove(mr);
  +         releaseMessageReference(mr);
            totalCacheSize--;
            log.trace("remove lock release");
         }
      }
  -   
  +
      /**
       * The strategy is that we keep the most recently used messages as
       * Hard references.  Then we make the older ones soft references.  Making
  @@ -108,7 +132,7 @@
            {
               log.trace("Waiting for a reference to get GCed.");
               // Get the next soft reference that was canned by the GC
  -            
  +
               Reference r = referenceQueue.remove(1000);
               if (r != null)
               {
  @@ -132,7 +156,7 @@
         }
         log.debug("Thread exiting.");
      }
  -   
  +
      /**
       * This method is in charge of determining if it time to convert some
       * hard references over to soft references.
  @@ -142,10 +166,10 @@
         log.trace("run lock aquire");
         synchronized (this)
         {
  -         
  +
            // howmany to change over to soft refs
            int chnageCount = 0;
  -         
  +
            long currentMem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
            if (currentMem > highMemoryMark)
            {
  @@ -158,7 +182,7 @@
               int howManyShouldBeSoft = (int) ((totoalMessageInMem) * severity);
               chnageCount = howManyShouldBeSoft - getSoftRefCacheSize();
            }
  -         
  +
            // Ignore change counts of 1 since this will happen too often even
            // if the serverity is low since it will round up.
            if (chnageCount > 1)
  @@ -174,9 +198,9 @@
            }
            log.trace("run lock release");
         }
  -      
  +
      }
  -   
  +
      /**
       * This gets called when a MessageReference is de-referenced.
       * We will pop it to the top of the RLU
  @@ -192,7 +216,7 @@
            log.trace("messageReferenceUsedEvent lock released");
         }
      }
  -   
  +
      
//////////////////////////////////////////////////////////////////////////////////
      // Perisitence methods used by the MessageReference.
      // TODO: delegate this work to a PM.
  @@ -205,7 +229,7 @@
         is.close();
         return (SpyMessage) rc;
      }
  -   
  +
      void saveToStorage(MessageReference mh, SpyMessage message) throws IOException
      {
         File f = new File(dataFile, "Message-" + mh.referenceId);
  @@ -213,20 +237,20 @@
         os.writeObject(message);
         os.close();
      }
  -   
  +
      void removeFromStorage(MessageReference mh) throws IOException
      {
         File f = new File(dataFile, "Message-" + mh.referenceId);
         f.delete();
      }
  -   
  +
      
//////////////////////////////////////////////////////////////////////////////////
      //
      // The following section deals the the JMX interface to manage the Cache
      //
      
//////////////////////////////////////////////////////////////////////////////////
  -   
  -   
  +
  +
      /**
       * This gets called to start the cache service. Synch. by start
       */
  @@ -235,11 +259,11 @@
         File jbossHome = new File(System.getProperty("jboss.system.home"));
         dataFile = new File(jbossHome, dataDirectory);
         log.debug("Data directory set to: " + dataFile.getCanonicalPath());
  -      
  +
         dataFile.mkdirs();
         if (!dataFile.isDirectory())
            throw new Exception("The configured data directory is not valid: " + 
dataDirectory);
  -      
  +
         // Clean out the directory of any previous files.
         File files[] = dataFile.listFiles();
         log.debug("Removing " + files.length + " file(s) from: " + 
dataFile.getCanonicalPath());
  @@ -247,16 +271,16 @@
         {
            files[i].delete();
         }
  -      
  +
         if (getState() == ServiceMBeanSupport.STARTED)
            throw new Exception("Cannot be initialized from the current state");
  -      
  +
         referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner");
         referenceSoftner.setDaemon(true);
         referenceSoftner.start();
  -      
  +
      }
  -   
  +
      /**
       * This gets called to stop the cache service.
       */
  @@ -264,11 +288,11 @@
      {
         if (!(getState() == ServiceMBeanSupport.STARTED))
            return;
  -      
  +
         referenceSoftner.interrupt();
         referenceSoftner = null;
      }
  -   
  +
      /**
       * Gets the dataDirectory
       * @return Returns a String
  @@ -285,7 +309,7 @@
      {
         this.dataDirectory = dataDirectory;
      }
  -   
  +
      /**
       * Gets the hardRefCacheSize
       * @return Returns a int
  @@ -294,7 +318,7 @@
      {
         return lruCache.size();
      }
  -   
  +
      /**
       * Gets the softRefCacheSize
       * @return Returns a int
  @@ -303,7 +327,7 @@
      {
         return softRefCacheSize;
      }
  -   
  +
      /**
       * Gets the totalCacheSize
       * @return Returns a int
  @@ -312,7 +336,7 @@
      {
         return totalCacheSize;
      }
  -   
  +
      /**
       * Gets the cacheMisses
       * @return Returns a int
  @@ -321,7 +345,7 @@
      {
         return cacheMisses;
      }
  -   
  +
      /**
       * Gets the cacheHits
       * @return Returns a int
  @@ -330,7 +354,7 @@
      {
         return cacheHits;
      }
  -   
  +
      /**
       * Gets the highMemoryMark
       * @return Returns a long
  @@ -347,7 +371,7 @@
      {
         this.highMemoryMark = highMemoryMark * ONE_MEGABYTE;
      }
  -   
  +
      /**
       * Gets the maxMemoryMark
       * @return Returns a long
  @@ -356,7 +380,7 @@
      {
         return maxMemoryMark / ONE_MEGABYTE;
      }
  -   
  +
      /**
       * Gets the CurrentMemoryUsage
       * @return Returns a long
  @@ -365,7 +389,7 @@
      {
         return (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
      }
  -   
  +
      /**
       * Sets the maxMemoryMark
       * @param maxMemoryMark The maxMemoryMark to set
  @@ -374,7 +398,7 @@
      {
         this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE;
      }
  -   
  +
      /**
       * @see ServiceMBeanSupport#getName()
       */
  @@ -382,7 +406,7 @@
      {
         return "MessageCache";
      }
  -   
  +
      /**
       * This test creates 5000 x 100K messages and places them
       * in the MessageCache.  With out a cache this would be
  @@ -391,7 +415,7 @@
       */
      public void testBigLoad() throws Exception
      {
  -      
  +
         MessageCache cache = new MessageCache();
         File tempDir = new File("Temp-" + System.currentTimeMillis());
         tempDir.mkdirs();
  @@ -399,9 +423,9 @@
         cache.setHighMemoryMark(40);
         cache.setMaxMemoryMark(60);
         cache.start();
  -      
  +
         LinkedList ll = new LinkedList();
  -      
  +
         int TEST_SIZE = 5000;
         // Create a whole bunch of messages.
         java.util.Random rand = new java.util.Random(System.currentTimeMillis());
  @@ -413,18 +437,18 @@
            bm.writeBytes(new byte[1024 * 100]); // 100K messages
            MessageReference mr = cache.add(bm);
            ll.add(mr);
  -         
  +
            // Randomly pickout messages out of the cache..
            int pick = rand.nextInt(i + 1);
            mr = (MessageReference) ll.get(pick);
            mr.getMessage();
         }
  -      
  +
         log.info("Used Mem=" + (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()));
         //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 
TEST_SIZE);
         log.info("Messages with Hard Refs=" + cache.getHardRefCacheSize());
         log.info("Messages with Soft Refs=" + cache.getSoftRefCacheSize());
  -      
  +
         log.info("Removing the messages");
         Iterator iter = ll.iterator();
         for (int i = 0; i < TEST_SIZE; i++)
  @@ -433,15 +457,15 @@
            iter.remove();
            cache.remove(mr);
         }
  -      
  +
         log.info("Stopping");
         //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 0);
         cache.stop();
         //assertTrue("Data directory clean up check", tempDir.listFiles().length == 
0);
         tempDir.delete();
  -      
  +
         log.info("Cache Hits=" + cache.getCacheHits());
         log.info("Cache Misses=" + cache.getCacheMisses());
      }
  -   
  +
   }
  
  
  
  1.4       +29 -9     jbossmq/src/main/org/jboss/mq/server/MessageReference.java
  
  Index: MessageReference.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageReference.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MessageReference.java     2001/11/06 06:14:31     1.3
  +++ MessageReference.java     2001/11/14 01:53:40     1.4
  @@ -10,16 +10,16 @@
   import org.jboss.mq.SpyJMSException;
   import javax.jms.JMSException;
   /**
  - * This class holds a reference to an actual Message.  Where it is actually 
  + * This class holds a reference to an actual Message.  Where it is actually
    * at may vary.  The reference it holds may be a:
    * <ul>
    * <li>Hard Reference - The message is consider recently used and should not be 
paged out
    * <li>Soft Reference - The message is consider old and CAN be removed from memory 
by the GC
    * <li>No Reference - The message was removed from memory by the GC, but we can 
load it from a file.
    * </ul>
  - * 
  + *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a>
  - * @version    $Revision: 1.3 $
  + * @version    $Revision: 1.4 $
    */
   public class MessageReference implements Comparable {
      org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(MessageReference.class);
  @@ -36,23 +36,37 @@
      MessageCache messageCache;
      SpyMessage hardReference;
      MessageSoftReference softReference;
  -   boolean isStored = false;
  +   boolean isStored;
   
      // These fields are copied over from the messae itself..
      // they are used too often to not have them handy.
      byte jmsPriority;
      long messageId;
      // This object could be used by the PM to associate some info
  -   public transient Object persistData;
  +   public transient Object persistData = null;
   
  -   MessageReference(MessageCache messageCache, Long referenceId, SpyMessage 
message) {
  +   MessageReference(){
  +   }
  +
  +   //init and reset methods for use by object pool
  +   void init(MessageCache messageCache, Long referenceId, SpyMessage message) {
         this.messageCache = messageCache;
         this.hardReference = message;
         this.referenceId = referenceId;
         this.jmsPriority = (byte) message.getJMSPriority();
         this.messageId = message.header.messageId;
  +      this.isStored = false;
      }
   
  +   void reset(){
  +      //clear refs so gc can collect unused objects
  +      this.messageCache = null;
  +      this.hardReference = null;
  +
  +      this.softReference = null;
  +      this.persistData = null;
  +   }
  +
      public SpyMessage getMessage() throws JMSException {
         cat.debug("getMessage lock aquire");
         synchronized (this) {
  @@ -71,9 +85,9 @@
   
      /**
       * We could optimize caching by keeping the headers but not the body.
  -    * The server will uses the headers more often than the body and the 
  +    * The server will uses the headers more often than the body and the
       * headers take up much message memory than the body
  -    * 
  +    *
       * For now just return the message.
       */
      public SpyMessage.Header getHeaders() throws javax.jms.JMSException {
  @@ -113,8 +127,14 @@
   
               softReference = new MessageSoftReference(this, hardReference, 
messageCache.referenceQueue);
   
  +            //I am undecided about whether to release this message back to the 
message pool or not
  +            //the whole point of this caching business is to release the memory 
this message is using up
  +            //if we are going to do it then this is the place...
  +            //MessagePool.releaseMessage(hardReference);
  +
               // We don't need the hard ref anymore..
               hardReference = null;
  +
               messageCache.softRefCacheSize++;
            } catch (java.io.IOException e) {
               throw new SpyJMSException("Message Cache IO error: ", e);
  @@ -167,7 +187,7 @@
      /**
       * This method allows message to be order on the server queues
       * by priority and the order that they came in on.
  -    * 
  +    *
       * @see Comparable#compareTo(Object)
       */
      public int compareTo(Object o) {
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to