Author: ritchiem
Date: Tue Oct 31 01:08:42 2006
New Revision: 469422

URL: http://svn.apache.org/viewvc?view=rev&rev=469422
Log:
QPID-56
Also resolves a race condition where an messages could be sent out of order.
This change needs to be benchmarked against original DeliveryManager.java with 
the race condition fixed.

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=469422&r1=469421&r2=469422
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
 Tue Oct 31 01:08:42 2006
@@ -19,10 +19,10 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -31,6 +31,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 /**
  * Manages delivery of messages on behalf of a queue
  */
@@ -44,7 +45,7 @@
     /**
      * Holds any queued messages
      */
-    private final Queue<AMQMessage> _messages = new 
ConcurrentLinkedQueueNoSize<AMQMessage>();
+    private final Queue<AMQMessage> _messages = new 
ConcurrentLinkedQueueAtomicSize<AMQMessage>();
     //private int _messageCount;
     /**
      * Ensures that only one asynchronous task is running for this manager at
@@ -63,8 +64,6 @@
     private final AMQQueue _queue;
 
 
-    private volatile int _queueSize = 0;
-
     DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
         //Set values from configuration
@@ -84,7 +83,7 @@
      */
     private boolean queueing()
     {
-        return getMessageCount() != 0;
+        return hasQueuedMessages();
     }
 
 
@@ -121,28 +120,19 @@
 
     private boolean addMessageToQueue(AMQMessage msg)
     {
-        // Shrink the ContentBodies to their actual size to save memory.
-        // synchronize to ensure this msg is the next one to get added.
+        // Shrink the ContentBodies to their actual size to save memory.       
         if (compressBufferOnQueue)
         {
-            synchronized(_messages)
+            Iterator it = msg.getContentBodies().iterator();
+            while (it.hasNext())
             {
-                Iterator it = msg.getContentBodies().iterator();
-                while (it.hasNext())
-                {
-                    ContentBody cb = (ContentBody) it.next();
-                    cb.reduceBufferToFit();
-                }
-
-                _messages.offer(msg);
-                _queueSize++;
+                ContentBody cb = (ContentBody) it.next();
+                cb.reduceBufferToFit();
             }
         }
-        else
-        {
-            _messages.offer(msg);
-            _queueSize++;
-        }
+
+        _messages.offer(msg);
+
         return true;
     }
 
@@ -153,9 +143,9 @@
      *
      * @return true if there are queued messages
      */
-    private boolean hasQueuedMessages()
+    public boolean hasQueuedMessages()
     {
-        return getMessageCount() != 0;
+        return !_messages.isEmpty();
     }
 
     public int getQueueMessageCount()
@@ -216,7 +206,8 @@
                 //We don't synchronize access to subscribers so need to 
re-check
                 if (next != null)
                 {
-                    next.send(poll(), _queue);
+                    next.send(peek(), _queue);
+                    poll();
                 }
                 else
                 {
@@ -230,7 +221,7 @@
         }
         finally
         {
-            _log.debug("End of processQueue: (" + _queueSize + ")" + " 
subscribers:" + _subscriptions.hasActiveSubscribers());
+            _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" 
+ " subscribers:" + _subscriptions.hasActiveSubscribers());
             _processing.set(false);
         }
     }
@@ -242,7 +233,6 @@
 
     private AMQMessage poll()
     {
-        _queueSize--;
         return _messages.poll();
     }
 
@@ -262,7 +252,7 @@
      */
     void processAsync(Executor executor)
     {
-        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + 
_queueSize + ")" +
+        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + 
getQueueMessageCount() + ")" +
                    " Active:" + _subscriptions.hasActiveSubscribers() +
                    " Processing:" + _processing.get());
 


Reply via email to