Author: ritchiem
Date: Thu Oct 18 02:09:38 2007
New Revision: 585906
URL: http://svn.apache.org/viewvc?rev=585906&view=rev
Log:
QPID-647 : Async Process start/stop is not regulated tightly enough. Added
additional synchronisation to ensure that a new subscriber can start the async
if required. As currently the start request can be missed.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585906&r1=585905&r2=585906&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Thu Oct 18 02:09:38 2007
@@ -211,10 +211,7 @@
}
}
- /**
- *
- * @return the state of the async processor.
- */
+ /** @return the state of the async processor. */
public boolean isProcessingAsync()
{
return _processing.get();
@@ -830,14 +827,15 @@
{
addMessageToQueue(msg, deliverFirst);
+ //release lock now message is on queue.
+ _lock.unlock();
+
//if we have a non-filtering subscriber but queued
messages && we're not Async && we have other Active subs then something is
wrong!
if ((s != null && hasQueuedMessages()) &&
!isProcessingAsync() && _subscriptions.hasActiveSubscribers())
{
_queue.deliverAsync();
}
- //release lock now message is on queue.
- _lock.unlock();
//Pre Deliver to all subscriptions
if (debugEnabled)
@@ -984,7 +982,7 @@
return id;
}
- Runner asyncDelivery = new Runner();
+ final Runner _asyncDelivery = new Runner();
private class Runner implements Runnable
{
@@ -1000,11 +998,13 @@
//Check that messages have not been added since we did our
last peek();
// Synchronize with the thread that adds to the queue.
// If the queue is still empty then we can exit
-
- if (!(hasQueuedMessages() &&
_subscriptions.hasActiveSubscribers()))
+ synchronized (_asyncDelivery)
{
- running = false;
- _processing.set(false);
+ if (!(hasQueuedMessages() &&
_subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
}
}
Thread.currentThread().setName(startName);
@@ -1018,16 +1018,19 @@
_log.debug(debugIdentity() + "Processing Async." +
currentStatus());
}
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ synchronized (_asyncDelivery)
{
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
- if (_log.isDebugEnabled())
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
{
- _log.debug(debugIdentity() + "Executing Async process.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async
process.");
+ }
+ executor.execute(_asyncDelivery);
}
- executor.execute(asyncDelivery);
}
}
}