Author: ritchiem
Date: Thu Apr 19 08:07:54 2007
New Revision: 530441
URL: http://svn.apache.org/viewvc?view=rev&rev=530441
Log:
QPID-459 - NoLocal broken when messages already exist on queue from consumer.
With test.
ConcurrentSelectorDeliveryManager - method changes from hasFilter to
filtersMessages.
Forgot to include the file in the commit.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=530441&r1=530440&r2=530441
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Thu Apr 19 08:07:54 2007
@@ -28,7 +28,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.hasFilters())
+ if (!sub.isSuspended() && sub.filtersMessages())
{
Queue<AMQMessage> preDeliveryQueue =
sub.getPreDeliveryQueue();
for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@
_processingThreadName = Thread.currentThread().getName();
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Running process Queue." +
currentStatus());
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -633,11 +637,17 @@
}
}
}
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Done process Queue." +
currentStatus());
+ }
+
}
// private void sendNextMessage(Subscription sub)
// {
-// if (sub.hasFilters())
+// if (sub.filtersMessages())
// {
// sendNextMessage(sub, sub.getPreDeliveryQueue());
// if (sub.isAutoClose())
@@ -817,6 +827,10 @@
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
executor.execute(asyncDelivery);
}
}