Author: ritchiem
Date: Wed Oct 17 09:59:42 2007
New Revision: 585575

URL: http://svn.apache.org/viewvc?rev=585575&view=rev
Log:
QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process 
if a msg is queued that has the potential to be delivered.

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585575&r1=585574&r2=585575&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Oct 17 09:59:42 2007
@@ -212,6 +212,15 @@
     }
 
     /**
+     *
+     * @return the state of the async processor.
+     */
+    public boolean isProcessingAsync()
+    {
+        return _processing.get();
+    }
+
+    /**
      * Returns all the messages in the Queue
      *
      * @return List of messages
@@ -821,6 +830,12 @@
                 {
                     addMessageToQueue(msg, deliverFirst);
 
+                    //if  we have a non-filtering subscriber but queued 
messages && we're not Async && we have other Active subs then something is 
wrong!
+                    if ((s != null && hasQueuedMessages()) && 
!isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+                    {
+                        _queue.deliverAsync();
+                    }
+
                     //release lock now message is on queue.
                     _lock.unlock();
 
@@ -975,6 +990,8 @@
     {
         public void run()
         {
+            String startName = Thread.currentThread().getName();
+            Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
             boolean running = true;
             while (running && !_movingMessages.get())
             {
@@ -990,6 +1007,7 @@
                     _processing.set(false);
                 }
             }
+            Thread.currentThread().setName(startName);
         }
     }
 
@@ -1016,8 +1034,9 @@
 
     private String currentStatus()
     {
-        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") 
+
-               "(" + _messages.size() + ":" + 
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") 
+
+               "(" + ((ConcurrentLinkedMessageQueueAtomicSize) 
_messages).headSize() +
+               ":" + (_messages.size() - 
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
                " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
                "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
                " Active:" + _subscriptions.hasActiveSubscribers() +


Reply via email to