Author: ritchiem
Date: Wed Apr 30 07:40:18 2008
New Revision: 652388
URL: http://svn.apache.org/viewvc?rev=652388&view=rev
Log:
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local
StoreContext()s so they are not reused by different threads.
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=652388&r1=652387&r2=652388&view=diff
==============================================================================
---
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Apr 30 07:40:18 2008
@@ -87,10 +87,6 @@
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
-
- /** Used by any reaping thread to purge messages */
- private StoreContext _reapingStoreContext = new StoreContext();
-
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions,
AMQQueue queue)
{
@@ -220,14 +216,22 @@
_lock.lock();
- for(Iterator<QueueEntry> iter = _messages.iterator();
iter.hasNext();)
+ // New Context to for dealing with the MessageStore.
+ StoreContext context = new StoreContext();
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
{
QueueEntry entry = iter.next();
if(entry.expired())
{
// fixme: Currently we have to update the total byte size here
for the data in the queue
_totalMessageSize.addAndGet(-entry.getSize());
- _queue.dequeue(_reapingStoreContext,entry);
+
+ // Remove the message from the queue in the MessageStore
+ _queue.dequeue(context,entry);
+
+ // This queue nolonger needs a reference to this message
+ entry.getMessage().decrementReference(context);
iter.remove();
}
}
@@ -469,14 +473,20 @@
synchronized (_queueHeadLock)
{
QueueEntry entry = getNextMessage();
+
+ // todo: note: why do we need this? Why not reuse the passed
'storeContext'
+ //Create a new StoreContext for decrementing the References
+ StoreContext context = new StoreContext();
+
while (entry != null)
{
//and remove it
_messages.poll();
+ // todo: NOTE: Why is this a different context to the new
local 'context'?
_queue.dequeue(storeContext, entry);
- entry.getMessage().decrementReference(_reapingStoreContext);
+ entry.getMessage().decrementReference(context);
entry = getNextMessage();
count++;
@@ -518,10 +528,13 @@
{
_totalMessageSize.addAndGet(-entry.getSize());
+ // New Store Context for removing expired messages
+ StoreContext storeContext = new StoreContext();
+
// Use the reapingStoreContext as any sub(if we have one) may
be in a tx.
- _queue.dequeue(_reapingStoreContext, entry);
+ _queue.dequeue(storeContext, entry);
- message.decrementReference(_reapingStoreContext);
+ message.decrementReference(storeContext);
if (_log.isInfoEnabled())
{