Author: ritchiem
Date: Wed Oct 17 09:59:42 2007
New Revision: 585575
URL: http://svn.apache.org/viewvc?rev=585575&view=rev
Log:
QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process
if a msg is queued that has the potential to be delivered.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585575&r1=585574&r2=585575&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Oct 17 09:59:42 2007
@@ -212,6 +212,15 @@
}
/**
+ *
+ * @return the state of the async processor.
+ */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
@@ -821,6 +830,12 @@
{
addMessageToQueue(msg, deliverFirst);
+ //if we have a non-filtering subscriber but queued
messages && we're not Async && we have other Active subs then something is
wrong!
+ if ((s != null && hasQueuedMessages()) &&
!isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
//release lock now message is on queue.
_lock.unlock();
@@ -975,6 +990,8 @@
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -990,6 +1007,7 @@
_processing.set(false);
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -1016,8 +1034,9 @@
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)")
+
- "(" + _messages.size() + ":" +
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)")
+
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize)
_messages).headSize() +
+ ":" + (_messages.size() -
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +