Author: ritchiem
Date: Thu Apr 19 07:42:53 2007
New Revision: 530432
URL: http://svn.apache.org/viewvc?view=rev&rev=530432
Log:
QPID-459 - NoLocal broken when messages already exist on queue from consumer.
With test.
AMQChannel remove comment around setPublisher - this is used by noLocal
implementation.
Subscription - rename of hasFilters to filtersMessages
AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename
of hasFilters to filtersMessages
SubscriptionImpl - rename of hasFilters to filtersMessages and changes to
include noLocal in that check.
TopicSessionTest - Additional testing for NoLocal to ensure.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Apr 19 07:42:53 2007
@@ -197,7 +197,6 @@
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
- // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Thu Apr 19 07:42:53 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@
/**
* Returns messages within the given range of message Ids
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long
toMessageId)
@@ -253,6 +254,7 @@
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with
given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@
/**
* moves messages from this queue to another queue. to do this the
approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages
available in the given message
- * id range - setup the other queue for moving messages (stop the async
delivery) - send these
- * available messages to the other queue (enqueue in other queue) - Once
sending to other Queue is successful,
- * remove messages from this queue - remove locks from both queues and
start async delivery
+ * moving messages (stop the async delivery) - get all the messages
available in the given message id range - setup
+ * the other queue for moving messages (stop the async delivery) - send
these available messages to the other queue
+ * (enqueue in other queue) - Once sending to other Queue is successful,
remove messages from this queue - remove
+ * locks from both queues and start async delivery
*
* @param fromMessageId
* @param toMessageId
@@ -442,7 +444,7 @@
Subscription subscription =
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
- if (subscription.hasFilters())
+ if (subscription.filtersMessages())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -641,7 +643,7 @@
{
_totalMessagesReceived.incrementAndGet();
}
-
+
try
{
_managedObject.checkForNotification(msg);
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Thu Apr 19 07:42:53 2007
@@ -32,7 +32,7 @@
void queueDeleted(AMQQueue queue) throws AMQException;
- boolean hasFilters();
+ boolean filtersMessages();
boolean hasInterest(AMQMessage msg);
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Thu Apr 19 07:42:53 2007
@@ -158,7 +158,7 @@
}
- if (_filters != null)
+ if (filtersMessages())
{
_messages = new
ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
@@ -346,9 +346,9 @@
channel.queueDeleted(queue);
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
- return _filters != null;
+ return _filters != null || _noLocal;
}
public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@
// return false;
}
- if (_noLocal)
+ final AMQProtocolSession publisher = msg.getPublisher();
+
+ //todo - client id should be recoreded and this test removed but
handled below
+ if (_noLocal && publisher != null)
{
// We don't want local messages so check to see if message is one
we sent
Object localInstance;
@@ -372,8 +375,9 @@
if ((protocolSession.getClientProperties() != null) &&
(localInstance =
protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) !=
null)
{
- if ((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance =
msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE))
!= null)
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance =
publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance ||
localInstance.equals(msgInstance))
{
@@ -388,8 +392,11 @@
}
else
{
+
localInstance = protocolSession.getClientIdentifier();
- msgInstance = msg.getPublisher().getClientIdentifier();
+ //todo - client id should be recoreded and this test removed
but handled here
+
+ msgInstance = publisher.getClientIdentifier();
if (localInstance == msgInstance || ((localInstance != null)
&& localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@
}
return false;
}
-
}
@@ -623,7 +629,7 @@
return _resendQueue;
}
- if (_filters != null)
+ if (filtersMessages())
{
if (isAutoClose())
{
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Thu Apr 19 07:42:53 2007
@@ -157,7 +157,7 @@
//FIXME the queue could be full of sent messages.
// Either need to clean all PDQs after sending a message
// OR have a clean up thread that runs the PDQs expunging
the messages.
- if (!subscription.hasFilters() ||
subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.filtersMessages() ||
subscription.getPreDeliveryQueue().isEmpty())
{
return subscription;
}
Modified:
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
(original)
+++
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Thu Apr 19 07:42:53 2007
@@ -102,7 +102,7 @@
}
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Thu Apr 19 07:42:53 2007
@@ -87,7 +87,7 @@
{
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}